diff --git a/dinky-admin/src/main/java/org/dinky/controller/TaskController.java b/dinky-admin/src/main/java/org/dinky/controller/TaskController.java index a8e9f63957..c865a08847 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/TaskController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/TaskController.java @@ -85,6 +85,7 @@ public class TaskController { @ExecuteProcess(type = ProcessType.FLINK_SUBMIT) @CheckTaskOwner(checkParam = TaskId.class, checkInterface = TaskService.class) public Result submitTask(@TaskId @ProcessId @RequestParam Integer id) throws Exception { + //todo 提交task JobResult jobResult = taskService.submitTask(TaskSubmitDto.builder().id(id).build()); if (jobResult.isSuccess()) { diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java index d2a85c6b8f..7edd90e87c 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java @@ -180,7 +180,7 @@ private String[] buildParams(int id) { .encodeToString(JsonUtils.toJsonString(appParamConfig).getBytes()); return StrFormatter.format("--config {}", encodeParam).split(" "); } - + //todo 第一步 @ProcessStep(type = ProcessStepType.SUBMIT_PRECHECK) public TaskDTO prepareTask(TaskSubmitDto submitDto) { TaskDTO task = this.getTaskInfoById(submitDto.getId()); @@ -199,6 +199,7 @@ public TaskDTO prepareTask(TaskSubmitDto submitDto) { @ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE) public JobResult executeJob(TaskDTO task) throws Exception { + //todo 获取task类型,提交task JobResult jobResult = BaseTask.getTask(task).execute(); log.info("execute job finished,status is {}", jobResult.getStatus()); return jobResult; @@ -215,13 +216,14 @@ public JobResult executeJob(TaskDTO task, Boolean stream) throws Exception { log.info("execute job finished,status is {}", jobResult.getStatus()); return jobResult; } - + //todo SUBMIT_BUILD_CONFIG @Override @ProcessStep(type = ProcessStepType.SUBMIT_BUILD_CONFIG) public JobConfig buildJobSubmitConfig(TaskDTO task) { if (Asserts.isNull(task.getType())) { task.setType(GatewayType.LOCAL.getLongValue()); } + //todo Start initialize FlinkSQLEnv task.setStatement(buildEnvSql(task) + task.getStatement()); JobConfig config = task.getJobConfig(); Savepoints savepoints = savepointsService.getSavePointWithStrategy(task); @@ -327,9 +329,11 @@ public String buildEnvSql(AbstractStatementDTO task) { public JobResult submitTask(TaskSubmitDto submitDto) throws Exception { // 注解自调用会失效,这里通过获取对象方法绕过此限制 TaskServiceImpl taskServiceBean = applicationContext.getBean(TaskServiceImpl.class); + //todo 第一步:Start check and config task TaskDTO taskDTO = taskServiceBean.prepareTask(submitDto); // The statement set is enabled by default when submitting assignments taskDTO.setStatementSet(true); + //todo 第一步:提交task JobResult jobResult = taskServiceBean.executeJob(taskDTO); if ((jobResult.getStatus() == Job.JobStatus.FAILED)) { throw new RuntimeException(jobResult.getError()); diff --git a/dinky-admin/src/main/java/org/dinky/service/task/FlinkSqlTask.java b/dinky-admin/src/main/java/org/dinky/service/task/FlinkSqlTask.java index 481f25fe41..bf4d599e4f 100644 --- a/dinky-admin/src/main/java/org/dinky/service/task/FlinkSqlTask.java +++ b/dinky-admin/src/main/java/org/dinky/service/task/FlinkSqlTask.java @@ -70,6 +70,7 @@ public JobResult execute() throws Exception { protected JobManager getJobManager() { TaskService taskService = SpringUtil.getBean(TaskServiceImpl.class); + //todo SUBMIT_BUILD_CONFIG return JobManager.build(taskService.buildJobSubmitConfig(task)); } diff --git a/dinky-app/dinky-app-1.14/src/main/java/org/dinky/app/MainApp.java b/dinky-app/dinky-app-1.14/src/main/java/org/dinky/app/MainApp.java index 4b5fddac96..2e0c7638b1 100644 --- a/dinky-app/dinky-app-1.14/src/main/java/org/dinky/app/MainApp.java +++ b/dinky-app/dinky-app-1.14/src/main/java/org/dinky/app/MainApp.java @@ -50,6 +50,7 @@ public static void main(String[] args) throws Exception { AppParamConfig appConfig = JsonUtils.toJavaBean(config, AppParamConfig.class); log.info("dinky app is Ready to run, config is {}", appConfig); DBUtil.init(appConfig); + //todo dinky main class Submitter.submit(appConfig); } } diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java index eec8cfc962..b7e1c3fe5b 100644 --- a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java +++ b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java @@ -258,6 +258,7 @@ public static Optional executeJarJob(String type, Executor executor, for (String statement : statements) { if (ExecuteJarParseStrategy.INSTANCE.match(statement)) { ExecuteJarOperation executeJarOperation = new ExecuteJarOperation(statement); + //todo 获取StreamGraph Pipeline pipeline = executeJarOperation.getStreamGraph(executor.getCustomTableEnvironment()); ReadableConfig configuration = executor.getStreamExecutionEnvironment().getConfiguration(); diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java index 6b641ba685..36880c1793 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java @@ -71,6 +71,7 @@ public Optional execute(CustomTableEnvironment tEnv) { } public Pipeline getStreamGraph(CustomTableEnvironment tEnv) { + //todo return getStreamGraph(tEnv, Collections.emptyList()); } @@ -79,7 +80,7 @@ public Pipeline getStreamGraph(CustomTableEnvironment tEnv, List classpaths JarSubmitParam submitParam = JarSubmitParam.build(statement); return getStreamGraph(submitParam, tEnv, classpaths); } - + //todo 这里要注意per-job和application模式的不同,主要是jar包地址和参数地址 public static Pipeline getStreamGraph( JarSubmitParam submitParam, CustomTableEnvironment tEnv, List classpaths) { SavepointRestoreSettings savepointRestoreSettings = StrUtil.isBlank(submitParam.getSavepointPath()) @@ -89,10 +90,12 @@ public static Pipeline getStreamGraph( PackagedProgram program; try { Configuration configuration = tEnv.getConfig().getConfiguration(); + //todo URLUtils::toFile【用户jar从资源中心下载到本地】 File file = Opt.ofBlankAble(submitParam.getUri()).map(URLUtils::toFile).orElse(null); String submitArgs = Opt.ofBlankAble(submitParam.getArgs()).orElse(""); if (!PackagedProgramUtils.isPython(submitParam.getMainClass())) { + //todo 添加用户自定义jar tEnv.addJar(file); } else { // python submit @@ -105,13 +108,14 @@ public static Pipeline getStreamGraph( .setEntryPointClassName(submitParam.getMainClass()) .setConfiguration(configuration) .setSavepointRestoreSettings(savepointRestoreSettings) - // todo 设置args + // todo 设置用户自定义jar的args .setArguments(extractArgs(submitArgs.trim()).toArray(new String[0])) .setUserClassPaths(classpaths) .build(); int parallelism = StrUtil.isNumeric(submitParam.getParallelism()) ? Convert.toInt(submitParam.getParallelism()) : tEnv.getStreamExecutionEnvironment().getParallelism(); + //todo 进入flink client!!!!!!! Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(program, configuration, parallelism, true); // When the UserCodeClassLoader is used to obtain the JobGraph in advance, @@ -185,9 +189,10 @@ public Pipeline explain(CustomTableEnvironment tEnv, List classpaths) { @Getter public static class JarSubmitParam { protected JarSubmitParam() {} - + //todo 执行sql jar中jar包的地址 private String uri; private String mainClass; + //todo 用户自定义jar main参数 private String args; private String parallelism; private String savepointPath; diff --git a/dinky-client/dinky-client-hadoop/pom.xml b/dinky-client/dinky-client-hadoop/pom.xml index 3cadb5ec68..5cc2ed18c6 100644 --- a/dinky-client/dinky-client-hadoop/pom.xml +++ b/dinky-client/dinky-client-hadoop/pom.xml @@ -31,10 +31,10 @@ Dinky : Client : Hadoop - 3.3.2 UTF-8 - compile + provided + diff --git a/dinky-common/src/main/java/org/dinky/data/constant/CommonConstant.java b/dinky-common/src/main/java/org/dinky/data/constant/CommonConstant.java index 5e1d7e8afd..364ab19935 100644 --- a/dinky-common/src/main/java/org/dinky/data/constant/CommonConstant.java +++ b/dinky-common/src/main/java/org/dinky/data/constant/CommonConstant.java @@ -31,7 +31,7 @@ public final class CommonConstant { /** 实例健康 */ public static final String HEALTHY = "1"; - + //todo dinky main-class public static final String DINKY_APP_MAIN_CLASS = "org.dinky.app.MainApp"; public static final String LineSep = System.getProperty("line.separator"); diff --git a/dinky-core/src/main/java/org/dinky/job/JobManager.java b/dinky-core/src/main/java/org/dinky/job/JobManager.java index 210674d790..527cb58f80 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobManager.java +++ b/dinky-core/src/main/java/org/dinky/job/JobManager.java @@ -210,6 +210,7 @@ public static JobManager buildPlanMode(JobConfig config) { public void init() { if (!isPlanMode) { runMode = GatewayType.get(config.getType()); + //todo application和per-job使用gateway模式 useGateway = GatewayType.isDeployCluster(config.getType()); handler = JobHandler.build(); } diff --git a/dinky-core/src/main/java/org/dinky/job/builder/JobJarStreamGraphBuilder.java b/dinky-core/src/main/java/org/dinky/job/builder/JobJarStreamGraphBuilder.java index 9335e09ad8..533845e978 100644 --- a/dinky-core/src/main/java/org/dinky/job/builder/JobJarStreamGraphBuilder.java +++ b/dinky-core/src/main/java/org/dinky/job/builder/JobJarStreamGraphBuilder.java @@ -93,6 +93,7 @@ private Pipeline getPipeline() { @Override public void run() throws Exception { + //todo application 和 per-job 使用gateway模式,session模式使用非gateway模式 if (!useGateway) { submitNormal(); } else { @@ -142,6 +143,7 @@ private GatewayResult submitNormalWithGateway() { .map(File::getAbsolutePath) .toArray(String[]::new); gatewayConfig.setJarPaths(jarPaths); + //todo 提交 return Gateway.build(gatewayConfig).submitJobGraph(jobGraph); } diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/config/AppConfig.java b/dinky-gateway/src/main/java/org/dinky/gateway/config/AppConfig.java index 9b9d3a5c6f..d50893c59c 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/config/AppConfig.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/config/AppConfig.java @@ -31,21 +31,21 @@ @Data @ApiModel(value = "AppConfig", description = "Configuration for the Flink application") public class AppConfig { - + //todo 用户程序的jar @ApiModelProperty( value = "Path to user JAR file", dataType = "String", example = "/path/to/user/app.jar", notes = "Path to the user's application JAR file") private String userJarPath; - + //todo 用户程序的配置文件 @ApiModelProperty( value = "User JAR file parameters", dataType = "String[]", example = "[]", notes = "Parameters to be passed to the user's application JAR file") private String[] userJarParas; - + //todo 用户程序的main class @ApiModelProperty( value = "Main application class in the JAR file", dataType = "String", diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/config/ClusterConfig.java b/dinky-gateway/src/main/java/org/dinky/gateway/config/ClusterConfig.java index 41967d9991..40f8e07278 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/config/ClusterConfig.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/config/ClusterConfig.java @@ -40,21 +40,21 @@ @Setter @ApiModel(value = "ClusterConfig", description = "Configuration for a Flink cluster") public class ClusterConfig { - + //todo 集群配置中的 Flink 配置文件路径 @ApiModelProperty( value = "Path to Flink configuration file", dataType = "String", example = "/opt/flink/conf/flink-conf.yaml", notes = "Path to the Flink configuration file") private String flinkConfigPath; - + //todo 集群配置中的 Flink Lib 路径 @ApiModelProperty( value = "Path to Flink library directory", dataType = "String", example = "/opt/flink/lib", notes = "Path to the Flink library directory") private String flinkLibPath; - + //todo 集群配置中的 Hadoop 配置文件路径 @ApiModelProperty( value = "Path to YARN configuration file", dataType = "String", diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnApplicationGateway.java b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnApplicationGateway.java index bbbf959c2e..00894e257c 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnApplicationGateway.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnApplicationGateway.java @@ -50,6 +50,7 @@ * * @since 2021/10/29 */ +//todo application模式 public class YarnApplicationGateway extends YarnGateway { @Override @@ -74,20 +75,23 @@ private String formatUrl(String url) { @Override public GatewayResult submitJar(FlinkUdfPathContextHolder udfPathContextHolder) { if (Asserts.isNull(yarnClient)) { + //todo 初始化 init(); } AppConfig appConfig = config.getAppConfig(); + //todo 上传用户程序的jar包 configuration.set(PipelineOptions.JARS, Collections.singletonList(formatUrl(appConfig.getUserJarPath()))); configuration.setString( "python.files", udfPathContextHolder.getPyUdfFile().stream().map(File::getName).collect(Collectors.joining(","))); - + //todo 上传用户程序的配置文件 String[] userJarParas = Asserts.isNotNull(appConfig.getUserJarParas()) ? appConfig.getUserJarParas() : new String[0]; ClusterSpecification.ClusterSpecificationBuilder clusterSpecificationBuilder = createClusterSpecificationBuilder(); + //todo 用户程序的main class ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(userJarParas, appConfig.getUserJarMainAppClass()); @@ -99,11 +103,11 @@ public GatewayResult submitJar(FlinkUdfPathContextHolder udfPathContextHolder) { clusterDescriptorAdapter.addShipFiles(Arrays.asList(preparSqlFile())); addConfigParas( CustomerConfigureOptions.EXEC_SQL_FILE, configuration.get(CustomerConfigureOptions.EXEC_SQL_FILE)); - + //todo application模式执行flink任务 ClusterClientProvider clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster( clusterSpecificationBuilder.createClusterSpecification(), applicationConfiguration); ClusterClient clusterClient = clusterClientProvider.getClusterClient(); - + //todo 获取weburl!!!!!! webUrl = getWebUrl(clusterClient, result); ApplicationId applicationId = clusterClient.getClusterId(); diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java index f2f25241d6..a57eb47dd0 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java @@ -119,6 +119,7 @@ private void initConfig() { final ClusterConfig clusterConfig = config.getClusterConfig(); configuration = GlobalConfiguration.loadConfiguration( clusterConfig.getFlinkConfigPath().trim()); + //todo 设置classloader.resolve-order:parent-first configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first"); final FlinkConfig flinkConfig = config.getFlinkConfig(); @@ -131,7 +132,7 @@ private void initConfig() { } configuration.set(DeploymentOptions.TARGET, getType().getLongValue()); - // todo flink lib可预先上传到hdfs上 + // todo flink lib可预先上传到hdfs上【设置yarn.provided.lib.dirs】 configuration.set( YarnConfigOptions.PROVIDED_LIB_DIRS, Collections.singletonList(clusterConfig.getFlinkLibPath())); if (Asserts.isNotNullString(flinkConfig.getJobName())) { @@ -139,6 +140,7 @@ private void initConfig() { } if (Asserts.isNotNullString(clusterConfig.getHadoopConfigPath())) { + //todo fs.hdfs.hadoopconf configuration.setString( ConfigConstants.PATH_HADOOP_CONFIG, FileUtil.file(clusterConfig.getHadoopConfigPath()).getAbsolutePath()); diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnPerJobGateway.java b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnPerJobGateway.java index c7d09d3a47..f7ed3eac3e 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnPerJobGateway.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnPerJobGateway.java @@ -54,6 +54,7 @@ public GatewayType getType() { @Override public GatewayResult submitJobGraph(JobGraph jobGraph) { if (Asserts.isNull(yarnClient)) { + //todo 初始化 init(); } diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnSessionGateway.java b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnSessionGateway.java index 0bea0652c2..338c37fd16 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnSessionGateway.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnSessionGateway.java @@ -54,6 +54,7 @@ public GatewayResult deployCluster(FlinkUdfPathContextHolder udfPathContextHolde YarnResult result = YarnResult.build(getType()); try (YarnClusterDescriptor yarnClusterDescriptor = createYarnClusterDescriptorWithJar(udfPathContextHolder)) { + //todo 启动session集群 ClusterClientProvider clusterClientProvider = yarnClusterDescriptor.deploySessionCluster( clusterSpecificationBuilder.createClusterSpecification()); ClusterClient clusterClient = clusterClientProvider.getClusterClient();