Skip to content

Commit

Permalink
[Refactor][UDF]Refactor udf execute (DataLinkDC#4017)
Browse files Browse the repository at this point in the history
Co-authored-by: zackyoungh <[email protected]>
  • Loading branch information
2 people authored and Zzm0809 committed Dec 7, 2024
1 parent b9088e1 commit 37af6a4
Show file tree
Hide file tree
Showing 27 changed files with 328 additions and 1,492 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,13 @@ public Map<String, String> getUdfReferMaps() {
return Asserts.isNotNullCollection(udfRefer)
? udfRefer.stream()
.filter(item -> item.getClassName() != null)
.map(t -> {
.peek(t -> {
if (StringUtils.isEmpty(t.getName())) {
String name = t.getClassName()
.substring(t.getClassName().lastIndexOf(".") + 1);
name = name.substring(0, 1).toLowerCase() + name.substring(1);
t.setName(name);
}
return t;
})
.collect(Collectors.toConcurrentMap(TaskUdfRefer::getClassName, TaskUdfRefer::getName))
: new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import org.dinky.explainer.lineage.LineageBuilder;
import org.dinky.explainer.lineage.LineageResult;
import org.dinky.explainer.sqllineage.SQLLineageBuilder;
import org.dinky.function.FunctionFactory;
import org.dinky.function.compiler.CustomStringJavaCompiler;
import org.dinky.function.data.model.UDF;
import org.dinky.function.pool.UdfCodePool;
Expand Down Expand Up @@ -107,7 +106,6 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -583,15 +581,14 @@ public boolean changeTaskLifeRecyle(Integer taskId, JobLifeCycle lifeCycle) thro
task.setVersionId(taskVersionId);
if (Dialect.isUDF(task.getDialect())) {
// compile udf class
UDF udf = UDFUtils.taskToUDF(task.buildTask());
try {
FunctionFactory.initUDF(Collections.singletonList(udf), task.getId());
UDF udf = UDFUtils.taskToUDF(task.buildTask());
UdfCodePool.addOrUpdate(udf);
} catch (Throwable e) {
throw new BusException(
"UDF compilation failed and cannot be published. The error message is as follows:"
+ e.getMessage());
}
UdfCodePool.addOrUpdate(udf);
}
} else {
if (Dialect.isUDF(task.getDialect())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.job.JobResult;
import org.dinky.job.runner.FlinkJarUtil;

import java.util.List;

Expand Down Expand Up @@ -53,8 +54,9 @@ public boolean stop() {

@Override
public ObjectNode getJobPlan() {
String statement = task.getStatement();
try {
return jobManager.getJarStreamGraphJson(task.getStatement());
return FlinkJarUtil.getJobPlan(statement, jobManager);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import org.dinky.job.JobResult;
import org.dinky.utils.UDFUtils;

import java.util.Collections;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.exceptions.ExceptionUtil;

Expand All @@ -47,7 +45,7 @@ public JobResult execute() throws Exception {
jobResult.setStatus(Job.JobStatus.SUCCESS);
try {
UDF udf = UDFUtils.taskToUDF(BeanUtil.toBean(task, Task.class));
FunctionFactory.initUDF(Collections.singletonList(udf), task.getId());
FunctionFactory.initUDF(udf, task.getId());
} catch (Exception e) {
jobResult.setSuccess(false);
jobResult.setError(ExceptionUtil.getRootCauseMessage(e));
Expand Down
8 changes: 7 additions & 1 deletion dinky-admin/src/main/java/org/dinky/utils/UDFUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.dinky.data.exception.BusException;
import org.dinky.data.model.Task;
import org.dinky.data.model.udf.UDFManage;
import org.dinky.function.compiler.FunctionCompiler;
import org.dinky.function.compiler.FunctionPackage;
import org.dinky.function.data.model.UDF;
import org.dinky.function.util.UDFUtil;

Expand All @@ -33,11 +35,15 @@ public class UDFUtils extends UDFUtil {
public static UDF taskToUDF(Task task) {
if (Asserts.isNotNull(task.getConfigJson())
&& Asserts.isNotNull(task.getConfigJson().getUdfConfig())) {
return UDF.builder()
UDF udf = UDF.builder()
.className(task.getConfigJson().getUdfConfig().getClassName())
.code(task.getStatement())
.functionLanguage(FunctionLanguage.valueOf(task.getDialect().toUpperCase()))
.build();

FunctionCompiler.getCompilerByTask(udf, task.getConfigJson().getCustomConfigMaps(), task.getId());
FunctionPackage.bale(udf, task.getId());
return udf;
} else {
throw new BusException("udf `class` config is null,please check your udf task config");
}
Expand Down
20 changes: 13 additions & 7 deletions dinky-admin/src/main/resources/DinkyFlinkDockerfile
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
# 用来构建dinky环境
ARG FLINK_VERSION=1.14.5
ARG FLINK_BIG_VERSION=1.14
ARG FLINK_VERSION=1.20.0
ARG FLINK_BIG_VERSION=1.20

FROM flink:${FLINK_VERSION}
FROM flink:${FLINK_VERSION}-scala_2.12-java8

ARG FLINK_VERSION
ARG FLINK_BIG_VERSION
ENV PYTHON_HOME /opt/miniconda3

USER root
RUN wget "https://s3.jcloud.sjtu.edu.cn/899a892efef34b1b944a19981040f55b-oss01/anaconda/miniconda/Miniconda3-py38_4.9.2-Linux-x86_64.sh" -O "miniconda.sh" && chmod +x miniconda.sh
RUN ./miniconda.sh -b -p $PYTHON_HOME && chown -R flink $PYTHON_HOME && ls $PYTHON_HOME

RUN ./miniconda.sh -b -p $PYTHON_HOME && chown -R flink $PYTHON_HOME && ls $PYTHON_HOME
USER flink

ENV PATH $PYTHON_HOME/bin:$PATH
RUN pip install "apache-flink==${FLINK_VERSION}" -i http://pypi.douban.com/simple/ --trusted-host pypi.douban.com

RUN cp /opt/flink/opt/flink-python_* /opt/flink/lib/
RUN pip install --upgrade pip -i https://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com
RUN rm -rf /opt/miniconda3/lib/python3.8/site-packages/ruamel*
RUN pip install "apache-flink==${FLINK_VERSION}" -i https://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com

RUN cp /opt/flink/opt/flink-python* /opt/flink/lib/
RUN rm -f /opt/flink/lib/flink-table-planner-loader*.jar

RUN cp /opt/flink/opt/flink-table-planner* /opt/flink/lib/

RUN wget -O dinky-app-${FLINK_BIG_VERSION}.jar - ${DINKY_HTTP}/downloadAppJar/${FLINK_BIG_VERSION} | mv dinky-app-${FLINK_BIG_VERSION}.jar
# RUN wget -O dinky-app-${FLINK_BIG_VERSION}.jar - ${DINKY_HTTP}/downloadAppJar/${FLINK_BIG_VERSION} | mv dinky-app-${FLINK_BIG_VERSION}.jar
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ public void addFile(File file) {
}

public void addPyUdfPath(File file) {
getPyUdfFile().add(file);
Set<File> pyUdfFile = getPyUdfFile();
pyUdfFile.add(file);
addUdfPath(file);
}

public void addOtherPlugins(File file) {
Expand Down
46 changes: 17 additions & 29 deletions dinky-core/src/main/java/org/dinky/explainer/Explainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,12 @@
import org.dinky.executor.Executor;
import org.dinky.explainer.mock.MockStatementExplainer;
import org.dinky.function.data.model.UDF;
import org.dinky.function.pool.UdfCodePool;
import org.dinky.function.util.UDFUtil;
import org.dinky.interceptor.FlinkInterceptor;
import org.dinky.job.JobConfig;
import org.dinky.job.JobManager;
import org.dinky.job.JobParam;
import org.dinky.job.JobRunnerFactory;
import org.dinky.job.JobStatementPlan;
import org.dinky.job.builder.JobUDFBuilder;
import org.dinky.trans.Operations;
import org.dinky.utils.DinkyClassLoaderUtil;
import org.dinky.utils.LogUtil;
import org.dinky.utils.SqlUtil;

Expand Down Expand Up @@ -84,19 +80,6 @@ public static Explainer build(Executor executor, boolean useStatementSet, JobMan
return new Explainer(executor, useStatementSet, jobManager);
}

public Explainer initialize(JobConfig config, String statement) {
DinkyClassLoaderUtil.initClassLoader(config, jobManager.getDinkyClassLoader());
String[] statements = SqlUtil.getStatements(SqlUtil.removeNote(statement));
List<UDF> udfs = parseUDFFromStatements(statements);
jobManager.setJobParam(new JobParam(udfs));
try {
JobUDFBuilder.build(jobManager).run();
} catch (Exception e) {
e.printStackTrace();
}
return this;
}

public JobStatementPlan parseStatements(String[] statements) {
JobStatementPlan jobStatementPlanWithMock = new JobStatementPlan();
generateUDFStatement(jobStatementPlanWithMock);
Expand All @@ -115,7 +98,9 @@ private void generateUDFStatement(JobStatementPlan jobStatementPlan) {
List<String> udfStatements = new ArrayList<>();
Optional.ofNullable(jobManager.getConfig().getUdfRefer())
.ifPresent(t -> t.forEach((key, value) -> {
String sql = String.format("create temporary function %s as '%s'", value, key);
UDF udf = UdfCodePool.getUDF(key);
String sql = String.format(
"create temporary function %s as '%s' language %s", value, key, udf.getFunctionLanguage());
udfStatements.add(sql);
}));
for (String udfStatement : udfStatements) {
Expand Down Expand Up @@ -217,24 +202,27 @@ public List<LineageRel> getLineage(String statement) {
.fragment(true)
.statementSet(useStatementSet)
.parallelism(1)
.udfRefer(jobManager.getConfig().getUdfRefer())
.configJson(executor.getTableConfig().getConfiguration().toMap())
.build();
jobManager.setConfig(jobConfig);
jobManager.setExecutor(executor);
this.initialize(jobConfig, statement);

List<LineageRel> lineageRelList = new ArrayList<>();
for (String item : SqlUtil.getStatements(statement)) {
String[] statements = SqlUtil.getStatements(statement);
JobStatementPlan jobStatementPlan = parseStatements(statements);
List<JobStatement> statementList = jobStatementPlan.getJobStatementList();
JobRunnerFactory jobRunnerFactory = JobRunnerFactory.create(jobManager);

for (JobStatement item : statementList) {
String sql = item.getStatement();
SqlType sqlType = item.getSqlType();

try {
String sql = FlinkInterceptor.pretreatStatement(executor, item);
if (Asserts.isNullString(sql)) {
continue;
}
SqlType operationType = Operations.getOperationType(sql);
if (operationType.equals(SqlType.INSERT)) {
if (sqlType.equals(SqlType.INSERT)) {
lineageRelList.addAll(executor.getLineage(sql));
} else if (!operationType.equals(SqlType.SELECT) && !operationType.equals(SqlType.PRINT)) {
executor.executeSql(sql);
} else if (!sqlType.equals(SqlType.SELECT) && !sqlType.equals(SqlType.PRINT)) {
jobRunnerFactory.getJobRunner(item.getStatementType()).run(item);
}
} catch (Exception e) {
log.error("Exception occurred while fetching lineage information", e);
Expand Down
64 changes: 0 additions & 64 deletions dinky-core/src/main/java/org/dinky/job/JobBuilder.java

This file was deleted.

14 changes: 0 additions & 14 deletions dinky-core/src/main/java/org/dinky/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,26 +52,19 @@
import org.dinky.gateway.result.GatewayResult;
import org.dinky.gateway.result.SavePointResult;
import org.dinky.gateway.result.TestResult;
import org.dinky.job.builder.JobJarStreamGraphBuilder;
import org.dinky.job.runner.JobJarRunner;
import org.dinky.trans.Operations;
import org.dinky.trans.parse.AddFileSqlParseStrategy;
import org.dinky.trans.parse.AddJarSqlParseStrategy;
import org.dinky.utils.DinkyClassLoaderUtil;
import org.dinky.utils.FlinkStreamEnvironmentUtil;
import org.dinky.utils.JsonUtils;
import org.dinky.utils.LogUtil;
import org.dinky.utils.SqlUtil;
import org.dinky.utils.URLUtils;

import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
Expand Down Expand Up @@ -248,13 +241,6 @@ public boolean close() {
return true;
}

public ObjectNode getJarStreamGraphJson(String statement) {
Pipeline pipeline = JobJarStreamGraphBuilder.build(this).getJarStreamGraph(statement, getDinkyClassLoader());
Configuration configuration = Configuration.fromMap(getExecutorConfig().getConfig());
JobGraph jobGraph = FlinkStreamEnvironmentUtil.getJobGraph(pipeline, configuration);
return JsonUtils.parseObject(JsonPlanGenerator.generatePlan(jobGraph));
}

@ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE)
public JobResult executeJarSql(String statement) throws Exception {
List<String> statements = Arrays.stream(SqlUtil.getStatements(statement))
Expand Down
Loading

0 comments on commit 37af6a4

Please sign in to comment.