Skip to content

Commit

Permalink
[BugFix][Flink Jar]Fix FLINK JAR submission (#4073)
Browse files Browse the repository at this point in the history
  • Loading branch information
zackyoungh authored Dec 22, 2024
1 parent 9cf618c commit de3aeed
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import org.springframework.stereotype.Service;

Expand Down Expand Up @@ -122,8 +123,10 @@ public LineageResult getLineage(StudioLineageDTO studioCADTO) {
TaskDTO taskDTO = taskService.getTaskInfoById(studioCADTO.getTaskId());
taskDTO.setStatement(taskService.buildEnvSql(taskDTO) + studioCADTO.getStatement());
JobConfig jobConfig = taskDTO.getJobConfig();
jobConfig.setUdfRefer(studioCADTO.getConfigJson().getUdfReferMaps());
jobConfig.setConfigJson(studioCADTO.getConfigJson().getCustomConfigMaps());
Optional.ofNullable(studioCADTO.getConfigJson()).ifPresent(config -> {
jobConfig.setUdfRefer(config.getUdfReferMaps());
jobConfig.setConfigJson(config.getCustomConfigMaps());
});

return LineageBuilder.getColumnLineageByLogicalPlan(taskDTO.getStatement(), jobConfig);
}
Expand Down
5 changes: 3 additions & 2 deletions dinky-core/src/main/java/org/dinky/executor/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,10 @@ public JobStatementPlan parseStatementIntoJobStatementPlan(String[] statements)
jobStatementPlan.addJobStatement(statement, JobStatementType.SET, operationType);
} else if (operationType.equals(SqlType.EXECUTE_JAR)) {
JarSubmitParam jarSubmitParam = JarSubmitParam.build(statement);
jarSubmitParam.setUri("base64@" + Base64.encode(pretreatStatement(jarSubmitParam.getArgs())));
String args = jarSubmitParam.getArgs();
jarSubmitParam.setUri("base64@" + Base64.encode(isUseSqlFragment() ? pretreatStatement(args) : args));
jobStatementPlan.addJobStatement(
jarSubmitParam.toString(), JobStatementType.EXECUTE_JAR, operationType);
jarSubmitParam.getStatement(), JobStatementType.EXECUTE_JAR, operationType);
} else if (operationType.equals(SqlType.EXECUTE)) {
jobStatementPlan.addJobStatement(statement, JobStatementType.PIPELINE, operationType);
} else if (operationType.equals(SqlType.PRINT)) {
Expand Down

0 comments on commit de3aeed

Please sign in to comment.