Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature-16699][Task Plugin] Flink submit option parameter supports placeholder substitution #16703

Open
wants to merge 8 commits into
base: dev
Choose a base branch
from
2 changes: 1 addition & 1 deletion docs/docs/en/guide/task/flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Flink task type, used to execute Flink programs. For Flink nodes:
| Parallelism | Used to set the degree of parallelism for executing Flink tasks. |
| Yarn queue | Used to set the yarn queue, use `default` queue by default. |
| Main program parameters | Set the input parameters for the Flink program and support the substitution of custom parameter variables. |
| Optional parameters | Set the flink command options, such as `-D`, `-C`, `-yt`. |
| Optional parameters | Set the flink command options, such as `-D`, `-C`, `-yt`, and support the substitution of custom parameter variables, such as `-Dyarn.application.name=${job_name}` custom parameter job _name will be replaced. |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this custom parameter job_name setting by users with hard code? If so, this is meaningless. We should add separate field options for users to use instead of using custom parameters, which are suitable for parameter passing and global parameters.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not hard-coded, user custom parameter pass and replace, like main program parameters:
image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I said that Custom parameters are suitable for parameter passing between tasks in a workflow and global parameters. yarn.application.name is a parameter of task level with different functions. We should not add it here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand boundary with custom parameters, option parameter replacement is also specific to Flink Task.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

option parameter replacement is also specific to Flink Task.

I don't think we should confuse usage of custom parameters with task parameters. Sorry, I'm -1 on this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understand your idea, but i have a question: custom parameters can't be used as task parameters? Now flink task main parameters can be replaced with custom parameters.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understand your idea, but i have a question: custom parameters can't be used as task parameters? Now flink task main parameters can be replaced with custom parameters.

I didn't say that custom parameters can't be used as task parameters. What I mean is job_name should be added into task parameters instead of custom parameters.

After take a look at dev branch, I found that the job_name already exists in task params right now with org.apache.dolphinscheduler.plugin.task.flink.FlinkParameters#appName. But it is only used in flink sql. We can just simpliy add it to the flink jar mode. And this issue can be easily solved.

| Custom parameter | It is a local user-defined parameter for Flink, and will replace the content with `${variable}` in the script. |

## Task Example
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/zh/guide/task/flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Flink 任务类型,用于执行 Flink 程序。对于 Flink 节点:
| 并行度 | 用于设置执行 Flink 任务的并行度 |
| Yarn 队列 | 用于设置 Yarn 队列,默认使用 default 队列 |
| 主程序参数 | 设置 Flink 程序的输入参数,支持自定义参数变量的替换 |
| 选项参数 | 设置Flink命令的选项参数,例如`-D`, `-C`, `-yt` |
| 选项参数 | 设置Flink命令的选项参数,例如`-D`, `-C`, `-yt`,支持自定义参数变量的替换,例如`-Dyarn.application.name=${job_name}`,自定义参数job_name会被替换 |
| 自定义参数 | 是 Flink 局部的用户自定义参数,会替换脚本中以 ${变量} 的内容 |

## 任务样例
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ private static List<String> buildRunCommandLineForSql(TaskExecutionContext taskE

String others = flinkParameters.getOthers();
if (StringUtils.isNotEmpty(others)) {
args.add(others);
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
args.add(ParameterUtils.convertParameterPlaceholders(others, ParameterUtils.convert(paramsMap)));
}
return args;
}
Expand Down Expand Up @@ -271,7 +272,8 @@ private static List<String> buildRunCommandLineForOthers(TaskExecutionContext ta

// -s -yqu -yat -yD -D
if (StringUtils.isNotEmpty(others)) {
args.add(others);
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
args.add(ParameterUtils.convertParameterPlaceholders(others, ParameterUtils.convert(paramsMap)));
}

// determine yarn queue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
package org.apache.dolphinscheduler.plugin.task.flink;

import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand All @@ -45,6 +49,7 @@ private FlinkParameters buildTestFlinkParametersWithDeployMode(FlinkDeployMode f
flinkParameters.setAppName("demo-app-name");
flinkParameters.setJobManagerMemory("1024m");
flinkParameters.setTaskManagerMemory("1024m");
flinkParameters.setOthers("-Dyarn.application.name=${job-name}");

return flinkParameters;
}
Expand All @@ -60,6 +65,12 @@ private TaskExecutionContext buildTestTaskExecutionContext() {
ResourceContext resourceContext = new ResourceContext();
resourceContext.addResourceItem(resourceItem);
taskExecutionContext.setResourceContext(resourceContext);

Map<String, Property> parameters = new HashMap<>();
parameters.put("job-name",
Property.builder().type(DataType.VARCHAR).prop("job-name").value("demo-app-name").build());
taskExecutionContext.setPrepareParamsMap(parameters);

return taskExecutionContext;
}

Expand All @@ -69,7 +80,7 @@ public void testRunJarInApplicationMode() throws Exception {
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);

Assertions.assertEquals(
"${FLINK_HOME}/bin/flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
"${FLINK_HOME}/bin/flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -Dyarn.application.name=demo-app-name -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine));
}

Expand All @@ -81,23 +92,23 @@ public void testRunJarInClusterMode() {
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);

Assertions.assertEquals(
"${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
"${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -Dyarn.application.name=demo-app-name -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine1));

flinkParameters.setFlinkVersion("<1.10");
List<String> commandLine2 =
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);

Assertions.assertEquals(
"${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
"${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -Dyarn.application.name=demo-app-name -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine2));

flinkParameters.setFlinkVersion(">=1.12");
List<String> commandLine3 =
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);

Assertions.assertEquals(
"${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
"${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -Dyarn.application.name=demo-app-name -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine3));
}

Expand All @@ -107,7 +118,7 @@ public void testRunJarInLocalMode() {
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);

Assertions.assertEquals(
"${FLINK_HOME}/bin/flink run -p 4 -sae -c org.example.Main /opt/job.jar",
"${FLINK_HOME}/bin/flink run -p 4 -sae -Dyarn.application.name=demo-app-name -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine));
}

Expand All @@ -118,7 +129,7 @@ public void testRunSql() {
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);

Assertions.assertEquals(
"${FLINK_HOME}/bin/sql-client.sh -i /tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql",
"${FLINK_HOME}/bin/sql-client.sh -i /tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql -Dyarn.application.name=demo-app-name",
joinStringListWithSpace(commandLine));
}

Expand Down
Loading