From 4fb43039c79a3193d5af691fd84ebb988c6ca998 Mon Sep 17 00:00:00 2001 From: Licho Date: Thu, 19 Oct 2023 14:04:51 +0800 Subject: [PATCH] [Fix][Flink]Fix local model bug and FlinkSql task type selected by default (#2403) * refactor: build pattern * feat: the first task type selected by default. * feat: detect remote by run model * fix: use GatewayType detection remmote --------- Co-authored-by: leechor --- .../org/dinky/app/flinksql/Submitter.java | 76 +++++------ .../explainer/lineage/LineageBuilder.java | 2 +- .../main/java/org/dinky/job/JobConfig.java | 7 +- dinky-executor/pom.xml | 4 + .../org/dinky/executor/ExecutorConfig.java | 119 +++++++----------- .../org/dinky/executor/ExecutorFactory.java | 4 +- .../dinky/executor/LocalStreamExecutor.java | 2 +- .../interceptor/FlinkInterceptorTest.java | 2 +- .../LeftContainer/Project/JobModal/index.tsx | 3 +- 9 files changed, 98 insertions(+), 121 deletions(-) diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java index 26b89bebe1..d6b2b2e130 100644 --- a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java +++ b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java @@ -99,7 +99,7 @@ private static String getFlinkSQLStatement(Integer id, DBConfig config) { "{} --> 获取 FlinkSQL 配置异常,ID 为 {}, 连接信息为:{} ,异常信息为:{} ", LocalDateTime.now(), id, - config.toString(), + config, e.getMessage(), e); } @@ -115,7 +115,7 @@ public static Map getTaskConfig(Integer id, DBConfig config) { "{} --> 获取 FlinkSQL 配置异常,ID 为 {}, 连接信息为:{} ,异常信息为:{} ", LocalDateTime.now(), id, - config.toString(), + config, e.getMessage(), e); } @@ -134,25 +134,20 @@ public static String getDbSourceSqlStatements(DBConfig dbConfig, Integer id) { String fragment = DBUtil.getOneByID(sqlCheck, dbConfig); if ("1".equals(fragment)) { return DBUtil.getDbSourceSQLStatement(sql, dbConfig); - } else { - // 全局变量未开启,返回空字符串 - logger.info("任务 {} 未开启全局变量,不进行变量加载。"); - return ""; } + + // 全局变量未开启,返回空字符串 + logger.info("任务 {} 未开启全局变量,不进行变量加载。", id); } catch (IOException | SQLException e) { logger.error( - "{} --> 获取 数据源信息异常,请检查数据库连接,连接信息为:{} ,异常信息为:{}", - LocalDateTime.now(), - dbConfig.toString(), - e.getMessage(), - e); + "{} --> 获取 数据源信息异常,请检查数据库连接,连接信息为:{} ,异常信息为:{}", LocalDateTime.now(), dbConfig, e.getMessage(), e); } return ""; } public static void submit(Integer id, DBConfig dbConfig, String dinkyAddr) { - logger.info(LocalDateTime.now() + "开始提交作业 -- " + id); + logger.info("{}开始提交作业 -- {}", LocalDateTime.now(), id); if (NULL.equals(dinkyAddr)) { dinkyAddr = ""; } @@ -166,13 +161,13 @@ public static void submit(Integer id, DBConfig dbConfig, String dinkyAddr) { } sb.append("\n"); } + // 添加数据源全局变量 sb.append(getDbSourceSqlStatements(dbConfig, id)); // 添加自定义全局变量信息 sb.append(getFlinkSQLStatement(id, dbConfig)); - List statements = Submitter.getStatements(sb.toString()); - ExecutorConfig executorConfig = ExecutorConfig.buildFromMap(taskConfig); + ExecutorConfig executorConfig = ExecutorConfig.buildFromMap(taskConfig); // 加载第三方jar loadDep(taskConfig.get("type"), id, dinkyAddr, executorConfig); @@ -181,11 +176,14 @@ public static void submit(Integer id, DBConfig dbConfig, String dinkyAddr) { List ddl = new ArrayList<>(); List trans = new ArrayList<>(); List execute = new ArrayList<>(); + + List statements = Submitter.getStatements(sb.toString()); for (String item : statements) { String statement = FlinkInterceptor.pretreatStatement(executor, item); if (statement.isEmpty()) { continue; } + SqlType operationType = Operations.getOperationType(statement); if (operationType.equals(SqlType.INSERT) || operationType.equals(SqlType.SELECT)) { trans.add(new StatementParam(statement, operationType)); @@ -201,12 +199,14 @@ public static void submit(Integer id, DBConfig dbConfig, String dinkyAddr) { ddl.add(new StatementParam(statement, operationType)); } } + for (StatementParam item : ddl) { - logger.info("Executing FlinkSQL: " + item.getValue()); + logger.info("Executing FlinkSQL: {}", item.getValue()); executor.executeSql(item.getValue()); logger.info("Execution succeeded."); } - if (trans.size() > 0) { + + if (!trans.isEmpty()) { if (executorConfig.isUseStatementSet()) { List inserts = new ArrayList<>(); for (StatementParam item : trans) { @@ -214,19 +214,18 @@ public static void submit(Integer id, DBConfig dbConfig, String dinkyAddr) { inserts.add(item.getValue()); } } - logger.info("Executing FlinkSQL statement set: " + String.join(FlinkSQLConstant.SEPARATOR, inserts)); + logger.info("Executing FlinkSQL statement set: {}", String.join(FlinkSQLConstant.SEPARATOR, inserts)); executor.executeStatementSet(inserts); logger.info("Execution succeeded."); } else { - for (StatementParam item : trans) { - logger.info("Executing FlinkSQL: " + item.getValue()); - executor.executeSql(item.getValue()); - logger.info("Execution succeeded."); - break; - } + StatementParam item = trans.get(0); + logger.info("Executing FlinkSQL: {}", item.getValue()); + executor.executeSql(item.getValue()); + logger.info("Execution succeeded."); } } - if (execute.size() > 0) { + + if (!execute.isEmpty()) { List executes = new ArrayList<>(); for (StatementParam item : execute) { executes.add(item.getValue()); @@ -235,7 +234,8 @@ public static void submit(Integer id, DBConfig dbConfig, String dinkyAddr) { break; } } - logger.info("正在执行 FlinkSQL 语句集: " + String.join(FlinkSQLConstant.SEPARATOR, executes)); + + logger.info("正在执行 FlinkSQL 语句集: {}", String.join(FlinkSQLConstant.SEPARATOR, executes)); try { executor.execute(executorConfig.getJobName()); logger.info("执行成功"); @@ -250,6 +250,7 @@ private static void loadDep(String type, Integer taskId, String dinkyAddr, Execu if (StringUtils.isBlank(dinkyAddr)) { return; } + if ("kubernetes-application".equals(type)) { try { String httpJar = "http://" + dinkyAddr + "/download/downloadDepJar/" + taskId; @@ -301,9 +302,8 @@ private static void loadDep(String type, Integer taskId, String dinkyAddr, Execu private static void addURLs(URL[] jarUrls) { URLClassLoader urlClassLoader = (URLClassLoader) ClassLoader.getSystemClassLoader(); - Method add = null; try { - add = URLClassLoader.class.getDeclaredMethod("addURL", URL.class); + Method add = URLClassLoader.class.getDeclaredMethod("addURL", URL.class); add.setAccessible(true); for (URL jarUrl : jarUrls) { add.invoke(urlClassLoader, jarUrl); @@ -319,18 +319,18 @@ public static boolean downloadFile(String url, String path) throws IOException { // 设置超时间为3秒 conn.setConnectTimeout(3 * 1000); // 获取输入流 - InputStream inputStream = conn.getInputStream(); - // 获取输出流 - FileOutputStream outputStream = new FileOutputStream(path); - // 每次下载1024位 - byte[] b = new byte[1024]; - int len = -1; - while ((len = inputStream.read(b)) != -1) { - outputStream.write(b, 0, len); + try (InputStream inputStream = conn.getInputStream()) { + // 获取输出流 + try (FileOutputStream outputStream = new FileOutputStream(path)) { + // 每次下载1024位 + byte[] b = new byte[1024]; + int len = -1; + while ((len = inputStream.read(b)) != -1) { + outputStream.write(b, 0, len); + } + return true; + } } - inputStream.close(); - outputStream.close(); - return true; } catch (Exception e) { return false; } diff --git a/dinky-core/src/main/java/org/dinky/explainer/lineage/LineageBuilder.java b/dinky-core/src/main/java/org/dinky/explainer/lineage/LineageBuilder.java index 0772709bb7..8d140bb350 100644 --- a/dinky-core/src/main/java/org/dinky/explainer/lineage/LineageBuilder.java +++ b/dinky-core/src/main/java/org/dinky/explainer/lineage/LineageBuilder.java @@ -36,7 +36,7 @@ public class LineageBuilder { public static LineageResult getColumnLineageByLogicalPlan(String statement) { - Explainer explainer = new Explainer(ExecutorFactory.getExecutor(), false); + Explainer explainer = new Explainer(ExecutorFactory.getDefaultExecutor(), false); List lineageRelList = explainer.getLineage(statement); List relations = new ArrayList<>(); Map tableMap = new HashMap<>(); diff --git a/dinky-core/src/main/java/org/dinky/job/JobConfig.java b/dinky-core/src/main/java/org/dinky/job/JobConfig.java index 0bbea9771a..bce7d48497 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobConfig.java +++ b/dinky-core/src/main/java/org/dinky/job/JobConfig.java @@ -51,7 +51,11 @@ @ApiModel(value = "JobConfig", description = "Configuration details of a job") public class JobConfig { - @ApiModelProperty(value = "Flink run mode", dataType = "String", example = "batch", notes = "Flink run mode") + @ApiModelProperty( + value = "Flink run mode", + dataType = "String", + example = "local standalone", + notes = "Flink run mode") private String type; @ApiModelProperty(value = "Check Point", dataType = "Integer", example = "1", notes = "Check point for the task") @@ -217,6 +221,7 @@ public void setAddress(String address) { public ExecutorConfig getExecutorSetting() { return ExecutorConfig.build( + type, address, checkpoint, parallelism, diff --git a/dinky-executor/pom.xml b/dinky-executor/pom.xml index 3b50c6b7b2..b1d3d97a90 100644 --- a/dinky-executor/pom.xml +++ b/dinky-executor/pom.xml @@ -139,6 +139,10 @@ org.dinky dinky-cdc-core + + org.dinky + dinky-gateway + diff --git a/dinky-executor/src/main/java/org/dinky/executor/ExecutorConfig.java b/dinky-executor/src/main/java/org/dinky/executor/ExecutorConfig.java index deae30b6a1..9d9b99ac5e 100644 --- a/dinky-executor/src/main/java/org/dinky/executor/ExecutorConfig.java +++ b/dinky-executor/src/main/java/org/dinky/executor/ExecutorConfig.java @@ -20,6 +20,7 @@ package org.dinky.executor; import org.dinky.assertion.Asserts; +import org.dinky.gateway.enums.GatewayType; import org.apache.commons.lang3.math.NumberUtils; @@ -36,6 +37,8 @@ import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Getter; import lombok.Setter; @@ -46,12 +49,21 @@ */ @Setter @Getter +@Builder +@AllArgsConstructor @ApiModel(value = "ExecutorConfig", description = "Executor config for a job") public class ExecutorConfig { private static final Logger log = LoggerFactory.getLogger(ExecutorConfig.class); - public static final ExecutorConfig DEFAULT = new ExecutorConfig(0, 1, true); + private static final ObjectMapper mapper = new ObjectMapper(); + + public static final ExecutorConfig DEFAULT = ExecutorConfig.builder() + .checkpoint(0) + .parallelism(1) + .useSqlFragment(true) + .build(); + public static final String TYPE_CONST = "type"; public static final String CHECKPOINT_CONST = "checkpoint"; public static final String PARALLELISM_CONST = "parallelism"; public static final String USE_SQL_FRAGMENT = "useSqlFragment"; @@ -61,7 +73,13 @@ public class ExecutorConfig { public static final String JOB_NAME = "jobName"; public static final String CONFIG_CONST = "config"; - private static final ObjectMapper mapper = new ObjectMapper(); + // after unique all run model to remote, this field could discard + @ApiModelProperty( + value = "Flink run mode", + dataType = "String", + example = "local standalone", + notes = "Flink run mode") + private String type; @ApiModelProperty( value = "Job manager rest host", @@ -139,67 +157,8 @@ public class ExecutorConfig { notes = "List of JAR files") private String[] jarFiles; - public ExecutorConfig(boolean useSqlFragment) { - this(null, useSqlFragment); - } - - public ExecutorConfig(Integer checkpoint) { - this(checkpoint, false); - } - - public ExecutorConfig(Integer checkpoint, boolean useSqlFragment) { - this(checkpoint, null, useSqlFragment, null, null); - } - - public ExecutorConfig(Integer checkpoint, Integer parallelism, boolean useSqlFragment) { - this(checkpoint, parallelism, useSqlFragment, null, null); - } - - public ExecutorConfig( - Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath, String jobName) { - this(checkpoint, parallelism, useSqlFragment, savePointPath, jobName, null); - } - - public ExecutorConfig(Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath) { - this(checkpoint, parallelism, useSqlFragment, savePointPath, null, null); - } - - public ExecutorConfig( - Integer checkpoint, - Integer parallelism, - boolean useSqlFragment, - String savePointPath, - String jobName, - Map config) { - this(null, null, checkpoint, parallelism, useSqlFragment, false, false, savePointPath, jobName, config, null); - } - - private ExecutorConfig( - String host, - Integer port, - Integer checkpoint, - Integer parallelism, - boolean useSqlFragment, - boolean useStatementSet, - boolean useBatchModel, - String savePointPath, - String jobName, - Map config, - Map variables) { - this.host = host; - this.port = port; - this.checkpoint = checkpoint; - this.parallelism = parallelism; - this.useSqlFragment = useSqlFragment; - this.useStatementSet = useStatementSet; - this.useBatchModel = useBatchModel; - this.savePointPath = savePointPath; - this.jobName = jobName; - this.config = config; - this.variables = variables; - } - public static ExecutorConfig build( + String type, String address, Integer checkpoint, Integer parallelism, @@ -224,21 +183,24 @@ public static ExecutorConfig build( } } - return new ExecutorConfig( - host, - port, - checkpoint, - parallelism, - useSqlFragment, - useStatementSet, - useBatchModel, - savePointPath, - jobName, - config, - variables); + return ExecutorConfig.builder() + .type(type) + .host(host) + .port(port) + .checkpoint(checkpoint) + .parallelism(parallelism) + .useSqlFragment(useSqlFragment) + .useStatementSet(useStatementSet) + .useBatchModel(useBatchModel) + .savePointPath(savePointPath) + .jobName(jobName) + .config(config) + .variables(variables) + .build(); } public static ExecutorConfig build( + String type, Integer checkpoint, Integer parallelism, boolean useSqlFragment, @@ -262,8 +224,9 @@ public static ExecutorConfig build( config.put(item.get("key"), item.get("value")); } } - return new ExecutorConfig( - null, + + return build( + type, null, checkpoint, parallelism, @@ -279,7 +242,9 @@ public static ExecutorConfig build( public static ExecutorConfig buildFromMap(Map settingMap) { Integer checkpoint = NumberUtils.createInteger(settingMap.get(CHECKPOINT_CONST)); Integer parallelism = NumberUtils.createInteger(settingMap.get(PARALLELISM_CONST)); + String type = settingMap.get(TYPE_CONST); return build( + type, checkpoint, parallelism, "1".equals(settingMap.get(USE_SQL_FRAGMENT)), @@ -295,7 +260,7 @@ public String getJobManagerAddress() { } public boolean isRemote() { - return Asserts.isNotNullString(this.getHost()); + return !GatewayType.get(type).isLocalExecute(); } public boolean isValidParallelism() { diff --git a/dinky-executor/src/main/java/org/dinky/executor/ExecutorFactory.java b/dinky-executor/src/main/java/org/dinky/executor/ExecutorFactory.java index 8c540b8d82..a6d58d85fd 100644 --- a/dinky-executor/src/main/java/org/dinky/executor/ExecutorFactory.java +++ b/dinky-executor/src/main/java/org/dinky/executor/ExecutorFactory.java @@ -27,7 +27,9 @@ **/ public final class ExecutorFactory { - public static Executor getExecutor() { + private ExecutorFactory() {} + + public static Executor getDefaultExecutor() { return new LocalStreamExecutor(ExecutorConfig.DEFAULT); } diff --git a/dinky-executor/src/main/java/org/dinky/executor/LocalStreamExecutor.java b/dinky-executor/src/main/java/org/dinky/executor/LocalStreamExecutor.java index cf680c5d21..b7a8d60f2d 100644 --- a/dinky-executor/src/main/java/org/dinky/executor/LocalStreamExecutor.java +++ b/dinky-executor/src/main/java/org/dinky/executor/LocalStreamExecutor.java @@ -30,7 +30,7 @@ import cn.hutool.core.io.FileUtil; /** - * LocalStreamExecuter + * LocalStreamExecutor * * @since 2021/5/25 13:48 */ diff --git a/dinky-executor/src/test/java/org/dinky/interceptor/FlinkInterceptorTest.java b/dinky-executor/src/test/java/org/dinky/interceptor/FlinkInterceptorTest.java index 0de91a8f07..11a2af005e 100644 --- a/dinky-executor/src/test/java/org/dinky/interceptor/FlinkInterceptorTest.java +++ b/dinky-executor/src/test/java/org/dinky/interceptor/FlinkInterceptorTest.java @@ -38,7 +38,7 @@ public void replaceFragmentTest() { String statement = "nullif1:=NULLIF(1, 0) as val;\n" + "nullif2:=NULLIF(0, 0) as val$null;\n" + "select ${nullif1},${nullif2}"; - String pretreatStatement = FlinkInterceptor.pretreatStatement(ExecutorFactory.getExecutor(), statement); + String pretreatStatement = FlinkInterceptor.pretreatStatement(ExecutorFactory.getDefaultExecutor(), statement); Assert.assertEquals("select NULLIF(1, 0) as val,NULLIF(0, 0) as val$null", pretreatStatement); } } diff --git a/dinky-web/src/pages/DataStudio/LeftContainer/Project/JobModal/index.tsx b/dinky-web/src/pages/DataStudio/LeftContainer/Project/JobModal/index.tsx index ef67ddfe1c..6a9a4bda44 100644 --- a/dinky-web/src/pages/DataStudio/LeftContainer/Project/JobModal/index.tsx +++ b/dinky-web/src/pages/DataStudio/LeftContainer/Project/JobModal/index.tsx @@ -70,7 +70,7 @@ const JobModal: React.FC = (props) => { }; useEffect(() => { - isUDF(jobType) && queryUdfTemplate(); + if (isUDF(jobType)) queryUdfTemplate(); }, [jobType, form]); /** @@ -110,6 +110,7 @@ const JobModal: React.FC = (props) => { label={l('catalog.type')} tooltip={l('catalog.type.tip')} options={JOB_TYPE} + initialValue={JOB_TYPE[0]['options'][0]['value']} disabled={!!values.id} placeholder={l('catalog.type.placeholder')} rules={[{ required: true, message: l('catalog.type.placeholder') }]}