Skip to content

Commit

Permalink
[BugFix][FlinkJar]Fix the issue where FlinkJar cannot use global vari…
Browse files Browse the repository at this point in the history
…ables (#4052)

Co-authored-by: zackyoungh <[email protected]>
  • Loading branch information
zackyoungh and zackyoungh authored Dec 16, 2024
1 parent 04f9072 commit f5f833f
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,17 @@ public String getArgs() {
}
return args;
}

public String getStatement() {
return StrUtil.format(
"EXECUTE JAR WITH (\n" + "'uri'='{}',\n"
+ "'main-class'='{}',\n"
+ "'args'='{}',\n"
+ "'allowNonRestoredState'='{}'\n"
+ ");",
getUri(),
getMainClass(),
getArgs(),
getAllowNonRestoredState());
}
}
7 changes: 6 additions & 1 deletion dinky-core/src/main/java/org/dinky/executor/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.dinky.data.job.JobStatement;
import org.dinky.data.job.JobStatementType;
import org.dinky.data.job.SqlType;
import org.dinky.data.model.JarSubmitParam;
import org.dinky.data.model.LineageRel;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.explainer.print_table.PrintStatementExplainer;
Expand Down Expand Up @@ -66,6 +67,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import cn.hutool.core.codec.Base64;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.URLUtil;
Expand Down Expand Up @@ -214,7 +216,10 @@ public JobStatementPlan parseStatementIntoJobStatementPlan(String[] statements)
if (operationType.equals(SqlType.SET) || operationType.equals(SqlType.RESET)) {
jobStatementPlan.addJobStatement(statement, JobStatementType.SET, operationType);
} else if (operationType.equals(SqlType.EXECUTE_JAR)) {
jobStatementPlan.addJobStatement(statement, JobStatementType.EXECUTE_JAR, operationType);
JarSubmitParam jarSubmitParam = JarSubmitParam.build(statement);
jarSubmitParam.setUri("base64@" + Base64.encode(pretreatStatement(jarSubmitParam.getArgs())));
jobStatementPlan.addJobStatement(
jarSubmitParam.toString(), JobStatementType.EXECUTE_JAR, operationType);
} else if (operationType.equals(SqlType.EXECUTE)) {
jobStatementPlan.addJobStatement(statement, JobStatementType.PIPELINE, operationType);
} else if (operationType.equals(SqlType.PRINT)) {
Expand Down
15 changes: 0 additions & 15 deletions dinky-core/src/main/java/org/dinky/explainer/Explainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.dinky.explainer.mock.MockStatementExplainer;
import org.dinky.function.data.model.UDF;
import org.dinky.function.pool.UdfCodePool;
import org.dinky.function.util.UDFUtil;
import org.dinky.job.JobConfig;
import org.dinky.job.JobManager;
import org.dinky.job.JobRunnerFactory;
Expand Down Expand Up @@ -108,20 +107,6 @@ private void generateUDFStatement(JobStatementPlan jobStatementPlan) {
}
}

public List<UDF> parseUDFFromStatements(String[] statements) {
List<UDF> udfList = new ArrayList<>();
for (String statement : statements) {
if (statement.isEmpty()) {
continue;
}
UDF udf = UDFUtil.toUDF(statement, jobManager.getDinkyClassLoader());
if (Asserts.isNotNull(udf)) {
udfList.add(udf);
}
}
return udfList;
}

public ExplainResult explainSql(String statement) {
log.info("Start explain FlinkSQL...");
JobStatementPlan jobStatementPlan;
Expand Down

0 comments on commit f5f833f

Please sign in to comment.