Skip to content

Commit

Permalink
[BugFix]Fix udf compiler problem (DataLinkDC#3518)
Browse files Browse the repository at this point in the history
  • Loading branch information
zackyoungh authored May 23, 2024
1 parent 67b7a7e commit 695b78d
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@
import org.dinky.explainer.lineage.LineageBuilder;
import org.dinky.explainer.lineage.LineageResult;
import org.dinky.explainer.sqllineage.SQLLineageBuilder;
import org.dinky.function.FunctionFactory;
import org.dinky.function.compiler.CustomStringJavaCompiler;
import org.dinky.function.data.model.UDF;
import org.dinky.function.pool.UdfCodePool;
import org.dinky.function.util.UDFUtil;
import org.dinky.gateway.enums.SavePointStrategy;
Expand Down Expand Up @@ -101,6 +103,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -561,7 +564,16 @@ public boolean changeTaskLifeRecyle(Integer taskId, JobLifeCycle lifeCycle) thro
Integer taskVersionId = taskVersionService.createTaskVersionSnapshot(task);
task.setVersionId(taskVersionId);
if (Dialect.isUDF(task.getDialect())) {
UdfCodePool.addOrUpdate(UDFUtils.taskToUDF(task.buildTask()));
// compile udf class
UDF udf = UDFUtils.taskToUDF(task.buildTask());
try {
FunctionFactory.initUDF(Collections.singletonList(udf), task.getId());
} catch (Throwable e) {
throw new BusException(
"UDF compilation failed and cannot be published. The error message is as follows:"
+ e.getMessage());
}
UdfCodePool.addOrUpdate(udf);
}
} else {
if (Dialect.isUDF(task.getDialect())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,38 +24,44 @@

import org.apache.flink.configuration.ReadableConfig;

import java.util.HashSet;
import java.util.Set;

import lombok.extern.slf4j.Slf4j;

/**
* java 编译
*
* @since 0.6.8
*/
@Slf4j
public class JavaCompiler implements FunctionCompiler {
private static final Set<String> COMPILER_CACHE = new HashSet<>();

/**
* 函数代码在线动态编译
*
* @param udf udf
* @param conf flink-conf
* @param udf udf
* @param conf flink-conf
* @param missionId 任务id
* @return 是否成功
*/
@Override
public boolean compiler(UDF udf, ReadableConfig conf, Integer missionId) {
public synchronized boolean compiler(UDF udf, ReadableConfig conf, Integer missionId) {
String key = udf.getClassName() + udf.getFunctionLanguage();
if (COMPILER_CACHE.contains(key)) {
return true;
}
// TODO 改为ProcessStep注释
log.info("正在编译 java 代码 , class: " + udf.getClassName());
log.info("Compiling java code, class: {}", udf.getClassName());
CustomStringJavaCompiler compiler = new CustomStringJavaCompiler(udf.getCode());
boolean res = compiler.compilerToTmpPath(PathConstant.getUdfCompilerJavaPath(missionId));
String className = compiler.getFullClassName();
if (res) {
log.info("class编译成功:" + className);
log.info("compilerTakeTime:" + compiler.getCompilerTakeTime());
log.info("class编译成功:{}", className);
log.info("compiler take time:{}", compiler.getCompilerTakeTime());
COMPILER_CACHE.add(key);
return true;
} else {
log.error("class编译失败:{}", className);
log.error("class编译失败:" + className);
log.error(compiler.getCompilerMessage());
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import org.apache.flink.table.catalog.FunctionLanguage;

import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;

/** @since 0.6.8 */
@Getter
@Setter
@Builder
@EqualsAndHashCode
public class UDF {

/** 函数名 */
Expand Down

0 comments on commit 695b78d

Please sign in to comment.