Skip to content

Commit

Permalink
【源码阅读1.1.0】flink jar任务提交
Browse files Browse the repository at this point in the history
  • Loading branch information
liujian committed Sep 14, 2024
1 parent 9a29e52 commit 1681a57
Show file tree
Hide file tree
Showing 16 changed files with 42 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public class TaskController {
@ExecuteProcess(type = ProcessType.FLINK_SUBMIT)
@CheckTaskOwner(checkParam = TaskId.class, checkInterface = TaskService.class)
public Result<JobResult> submitTask(@TaskId @ProcessId @RequestParam Integer id) throws Exception {
//todo 提交task
JobResult jobResult =
taskService.submitTask(TaskSubmitDto.builder().id(id).build());
if (jobResult.isSuccess()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ private String[] buildParams(int id) {
.encodeToString(JsonUtils.toJsonString(appParamConfig).getBytes());
return StrFormatter.format("--config {}", encodeParam).split(" ");
}

//todo 第一步
@ProcessStep(type = ProcessStepType.SUBMIT_PRECHECK)
public TaskDTO prepareTask(TaskSubmitDto submitDto) {
TaskDTO task = this.getTaskInfoById(submitDto.getId());
Expand All @@ -199,6 +199,7 @@ public TaskDTO prepareTask(TaskSubmitDto submitDto) {

@ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE)
public JobResult executeJob(TaskDTO task) throws Exception {
//todo 获取task类型,提交task
JobResult jobResult = BaseTask.getTask(task).execute();
log.info("execute job finished,status is {}", jobResult.getStatus());
return jobResult;
Expand All @@ -215,13 +216,14 @@ public JobResult executeJob(TaskDTO task, Boolean stream) throws Exception {
log.info("execute job finished,status is {}", jobResult.getStatus());
return jobResult;
}

//todo SUBMIT_BUILD_CONFIG
@Override
@ProcessStep(type = ProcessStepType.SUBMIT_BUILD_CONFIG)
public JobConfig buildJobSubmitConfig(TaskDTO task) {
if (Asserts.isNull(task.getType())) {
task.setType(GatewayType.LOCAL.getLongValue());
}
//todo Start initialize FlinkSQLEnv
task.setStatement(buildEnvSql(task) + task.getStatement());
JobConfig config = task.getJobConfig();
Savepoints savepoints = savepointsService.getSavePointWithStrategy(task);
Expand Down Expand Up @@ -327,9 +329,11 @@ public String buildEnvSql(AbstractStatementDTO task) {
public JobResult submitTask(TaskSubmitDto submitDto) throws Exception {
// 注解自调用会失效,这里通过获取对象方法绕过此限制
TaskServiceImpl taskServiceBean = applicationContext.getBean(TaskServiceImpl.class);
//todo 第一步:Start check and config task
TaskDTO taskDTO = taskServiceBean.prepareTask(submitDto);
// The statement set is enabled by default when submitting assignments
taskDTO.setStatementSet(true);
//todo 第一步:提交task
JobResult jobResult = taskServiceBean.executeJob(taskDTO);
if ((jobResult.getStatus() == Job.JobStatus.FAILED)) {
throw new RuntimeException(jobResult.getError());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public JobResult execute() throws Exception {

protected JobManager getJobManager() {
TaskService taskService = SpringUtil.getBean(TaskServiceImpl.class);
//todo SUBMIT_BUILD_CONFIG
return JobManager.build(taskService.buildJobSubmitConfig(task));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public static void main(String[] args) throws Exception {
AppParamConfig appConfig = JsonUtils.toJavaBean(config, AppParamConfig.class);
log.info("dinky app is Ready to run, config is {}", appConfig);
DBUtil.init(appConfig);
//todo dinky main class
Submitter.submit(appConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ public static Optional<JobClient> executeJarJob(String type, Executor executor,
for (String statement : statements) {
if (ExecuteJarParseStrategy.INSTANCE.match(statement)) {
ExecuteJarOperation executeJarOperation = new ExecuteJarOperation(statement);
//todo 获取StreamGraph
Pipeline pipeline = executeJarOperation.getStreamGraph(executor.getCustomTableEnvironment());
ReadableConfig configuration =
executor.getStreamExecutionEnvironment().getConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public Optional<? extends TableResult> execute(CustomTableEnvironment tEnv) {
}

public Pipeline getStreamGraph(CustomTableEnvironment tEnv) {
//todo
return getStreamGraph(tEnv, Collections.emptyList());
}

Expand All @@ -79,7 +80,7 @@ public Pipeline getStreamGraph(CustomTableEnvironment tEnv, List<URL> classpaths
JarSubmitParam submitParam = JarSubmitParam.build(statement);
return getStreamGraph(submitParam, tEnv, classpaths);
}

//todo 这里要注意per-job和application模式的不同,主要是jar包地址和参数地址
public static Pipeline getStreamGraph(
JarSubmitParam submitParam, CustomTableEnvironment tEnv, List<URL> classpaths) {
SavepointRestoreSettings savepointRestoreSettings = StrUtil.isBlank(submitParam.getSavepointPath())
Expand All @@ -89,10 +90,12 @@ public static Pipeline getStreamGraph(
PackagedProgram program;
try {
Configuration configuration = tEnv.getConfig().getConfiguration();
//todo URLUtils::toFile【用户jar从资源中心下载到本地】
File file =
Opt.ofBlankAble(submitParam.getUri()).map(URLUtils::toFile).orElse(null);
String submitArgs = Opt.ofBlankAble(submitParam.getArgs()).orElse("");
if (!PackagedProgramUtils.isPython(submitParam.getMainClass())) {
//todo 添加用户自定义jar
tEnv.addJar(file);
} else {
// python submit
Expand All @@ -105,13 +108,14 @@ public static Pipeline getStreamGraph(
.setEntryPointClassName(submitParam.getMainClass())
.setConfiguration(configuration)
.setSavepointRestoreSettings(savepointRestoreSettings)
// todo 设置args
// todo 设置用户自定义jar的args
.setArguments(extractArgs(submitArgs.trim()).toArray(new String[0]))
.setUserClassPaths(classpaths)
.build();
int parallelism = StrUtil.isNumeric(submitParam.getParallelism())
? Convert.toInt(submitParam.getParallelism())
: tEnv.getStreamExecutionEnvironment().getParallelism();
//todo 进入flink client!!!!!!!
Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(program, configuration, parallelism, true);

// When the UserCodeClassLoader is used to obtain the JobGraph in advance,
Expand Down Expand Up @@ -185,9 +189,10 @@ public Pipeline explain(CustomTableEnvironment tEnv, List<URL> classpaths) {
@Getter
public static class JarSubmitParam {
protected JarSubmitParam() {}

//todo 执行sql jar中jar包的地址
private String uri;
private String mainClass;
//todo 用户自定义jar main参数
private String args;
private String parallelism;
private String savepointPath;
Expand Down
4 changes: 2 additions & 2 deletions dinky-client/dinky-client-hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
<name>Dinky : Client : Hadoop</name>

<properties>
<!--<scope.type>provided</scope.type>-->
<hadoop.version>3.3.2</hadoop.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<scope.type>compile</scope.type>
<scope.type>provided</scope.type>
<!-- <scope.type>compile</scope.type>-->
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public final class CommonConstant {

/** 实例健康 */
public static final String HEALTHY = "1";

//todo dinky main-class
public static final String DINKY_APP_MAIN_CLASS = "org.dinky.app.MainApp";
public static final String LineSep = System.getProperty("line.separator");

Expand Down
1 change: 1 addition & 0 deletions dinky-core/src/main/java/org/dinky/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ public static JobManager buildPlanMode(JobConfig config) {
public void init() {
if (!isPlanMode) {
runMode = GatewayType.get(config.getType());
//todo application和per-job使用gateway模式
useGateway = GatewayType.isDeployCluster(config.getType());
handler = JobHandler.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ private Pipeline getPipeline() {

@Override
public void run() throws Exception {
//todo application 和 per-job 使用gateway模式,session模式使用非gateway模式
if (!useGateway) {
submitNormal();
} else {
Expand Down Expand Up @@ -142,6 +143,7 @@ private GatewayResult submitNormalWithGateway() {
.map(File::getAbsolutePath)
.toArray(String[]::new);
gatewayConfig.setJarPaths(jarPaths);
//todo 提交
return Gateway.build(gatewayConfig).submitJobGraph(jobGraph);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,21 @@
@Data
@ApiModel(value = "AppConfig", description = "Configuration for the Flink application")
public class AppConfig {

//todo 用户程序的jar
@ApiModelProperty(
value = "Path to user JAR file",
dataType = "String",
example = "/path/to/user/app.jar",
notes = "Path to the user's application JAR file")
private String userJarPath;

//todo 用户程序的配置文件
@ApiModelProperty(
value = "User JAR file parameters",
dataType = "String[]",
example = "[]",
notes = "Parameters to be passed to the user's application JAR file")
private String[] userJarParas;

//todo 用户程序的main class
@ApiModelProperty(
value = "Main application class in the JAR file",
dataType = "String",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,21 @@
@Setter
@ApiModel(value = "ClusterConfig", description = "Configuration for a Flink cluster")
public class ClusterConfig {

//todo 集群配置中的 Flink 配置文件路径
@ApiModelProperty(
value = "Path to Flink configuration file",
dataType = "String",
example = "/opt/flink/conf/flink-conf.yaml",
notes = "Path to the Flink configuration file")
private String flinkConfigPath;

//todo 集群配置中的 Flink Lib 路径
@ApiModelProperty(
value = "Path to Flink library directory",
dataType = "String",
example = "/opt/flink/lib",
notes = "Path to the Flink library directory")
private String flinkLibPath;

//todo 集群配置中的 Hadoop 配置文件路径
@ApiModelProperty(
value = "Path to YARN configuration file",
dataType = "String",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
*
* @since 2021/10/29
*/
//todo application模式
public class YarnApplicationGateway extends YarnGateway {

@Override
Expand All @@ -74,20 +75,23 @@ private String formatUrl(String url) {
@Override
public GatewayResult submitJar(FlinkUdfPathContextHolder udfPathContextHolder) {
if (Asserts.isNull(yarnClient)) {
//todo 初始化
init();
}

AppConfig appConfig = config.getAppConfig();
//todo 上传用户程序的jar包
configuration.set(PipelineOptions.JARS, Collections.singletonList(formatUrl(appConfig.getUserJarPath())));
configuration.setString(
"python.files",
udfPathContextHolder.getPyUdfFile().stream().map(File::getName).collect(Collectors.joining(",")));

//todo 上传用户程序的配置文件
String[] userJarParas =
Asserts.isNotNull(appConfig.getUserJarParas()) ? appConfig.getUserJarParas() : new String[0];

ClusterSpecification.ClusterSpecificationBuilder clusterSpecificationBuilder =
createClusterSpecificationBuilder();
//todo 用户程序的main class
ApplicationConfiguration applicationConfiguration =
new ApplicationConfiguration(userJarParas, appConfig.getUserJarMainAppClass());

Expand All @@ -99,11 +103,11 @@ public GatewayResult submitJar(FlinkUdfPathContextHolder udfPathContextHolder) {
clusterDescriptorAdapter.addShipFiles(Arrays.asList(preparSqlFile()));
addConfigParas(
CustomerConfigureOptions.EXEC_SQL_FILE, configuration.get(CustomerConfigureOptions.EXEC_SQL_FILE));

//todo application模式执行flink任务
ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(
clusterSpecificationBuilder.createClusterSpecification(), applicationConfiguration);
ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();

//todo 获取weburl!!!!!!
webUrl = getWebUrl(clusterClient, result);

ApplicationId applicationId = clusterClient.getClusterId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ private void initConfig() {
final ClusterConfig clusterConfig = config.getClusterConfig();
configuration = GlobalConfiguration.loadConfiguration(
clusterConfig.getFlinkConfigPath().trim());
//todo 设置classloader.resolve-order:parent-first
configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");

final FlinkConfig flinkConfig = config.getFlinkConfig();
Expand All @@ -131,14 +132,15 @@ private void initConfig() {
}

configuration.set(DeploymentOptions.TARGET, getType().getLongValue());
// todo flink lib可预先上传到hdfs上
// todo flink lib可预先上传到hdfs上【设置yarn.provided.lib.dirs】
configuration.set(
YarnConfigOptions.PROVIDED_LIB_DIRS, Collections.singletonList(clusterConfig.getFlinkLibPath()));
if (Asserts.isNotNullString(flinkConfig.getJobName())) {
configuration.set(YarnConfigOptions.APPLICATION_NAME, flinkConfig.getJobName());
}

if (Asserts.isNotNullString(clusterConfig.getHadoopConfigPath())) {
//todo fs.hdfs.hadoopconf
configuration.setString(
ConfigConstants.PATH_HADOOP_CONFIG,
FileUtil.file(clusterConfig.getHadoopConfigPath()).getAbsolutePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public GatewayType getType() {
@Override
public GatewayResult submitJobGraph(JobGraph jobGraph) {
if (Asserts.isNull(yarnClient)) {
//todo 初始化
init();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public GatewayResult deployCluster(FlinkUdfPathContextHolder udfPathContextHolde

YarnResult result = YarnResult.build(getType());
try (YarnClusterDescriptor yarnClusterDescriptor = createYarnClusterDescriptorWithJar(udfPathContextHolder)) {
//todo 启动session集群
ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deploySessionCluster(
clusterSpecificationBuilder.createClusterSpecification());
ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();
Expand Down

0 comments on commit 1681a57

Please sign in to comment.