diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java index 66da25d058..72f8df14d7 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java @@ -59,7 +59,9 @@ import org.dinky.explainer.lineage.LineageBuilder; import org.dinky.explainer.lineage.LineageResult; import org.dinky.explainer.sqllineage.SQLLineageBuilder; +import org.dinky.function.FunctionFactory; import org.dinky.function.compiler.CustomStringJavaCompiler; +import org.dinky.function.data.model.UDF; import org.dinky.function.pool.UdfCodePool; import org.dinky.function.util.UDFUtil; import org.dinky.gateway.enums.SavePointStrategy; @@ -101,6 +103,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Base64; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -561,7 +564,16 @@ public boolean changeTaskLifeRecyle(Integer taskId, JobLifeCycle lifeCycle) thro Integer taskVersionId = taskVersionService.createTaskVersionSnapshot(task); task.setVersionId(taskVersionId); if (Dialect.isUDF(task.getDialect())) { - UdfCodePool.addOrUpdate(UDFUtils.taskToUDF(task.buildTask())); + // compile udf class + UDF udf = UDFUtils.taskToUDF(task.buildTask()); + try { + FunctionFactory.initUDF(Collections.singletonList(udf), task.getId()); + } catch (Throwable e) { + throw new BusException( + "UDF compilation failed and cannot be published. The error message is as follows:" + + e.getMessage()); + } + UdfCodePool.addOrUpdate(udf); } } else { if (Dialect.isUDF(task.getDialect()) diff --git a/dinky-function/src/main/java/org/dinky/function/compiler/JavaCompiler.java b/dinky-function/src/main/java/org/dinky/function/compiler/JavaCompiler.java index 4eb92d15bf..56fc310d78 100644 --- a/dinky-function/src/main/java/org/dinky/function/compiler/JavaCompiler.java +++ b/dinky-function/src/main/java/org/dinky/function/compiler/JavaCompiler.java @@ -24,38 +24,44 @@ import org.apache.flink.configuration.ReadableConfig; +import java.util.HashSet; +import java.util.Set; + import lombok.extern.slf4j.Slf4j; /** * java 编译 - * - * @since 0.6.8 */ @Slf4j public class JavaCompiler implements FunctionCompiler { + private static final Set COMPILER_CACHE = new HashSet<>(); /** * 函数代码在线动态编译 * - * @param udf udf - * @param conf flink-conf + * @param udf udf + * @param conf flink-conf * @param missionId 任务id * @return 是否成功 */ @Override - public boolean compiler(UDF udf, ReadableConfig conf, Integer missionId) { + public synchronized boolean compiler(UDF udf, ReadableConfig conf, Integer missionId) { + String key = udf.getClassName() + udf.getFunctionLanguage(); + if (COMPILER_CACHE.contains(key)) { + return true; + } // TODO 改为ProcessStep注释 - log.info("正在编译 java 代码 , class: " + udf.getClassName()); + log.info("Compiling java code, class: {}", udf.getClassName()); CustomStringJavaCompiler compiler = new CustomStringJavaCompiler(udf.getCode()); boolean res = compiler.compilerToTmpPath(PathConstant.getUdfCompilerJavaPath(missionId)); String className = compiler.getFullClassName(); if (res) { - log.info("class编译成功:" + className); - log.info("compilerTakeTime:" + compiler.getCompilerTakeTime()); + log.info("class编译成功:{}", className); + log.info("compiler take time:{}", compiler.getCompilerTakeTime()); + COMPILER_CACHE.add(key); return true; } else { log.error("class编译失败:{}", className); - log.error("class编译失败:" + className); log.error(compiler.getCompilerMessage()); return false; } diff --git a/dinky-function/src/main/java/org/dinky/function/data/model/UDF.java b/dinky-function/src/main/java/org/dinky/function/data/model/UDF.java index 07d18911e1..6cb8c1535c 100644 --- a/dinky-function/src/main/java/org/dinky/function/data/model/UDF.java +++ b/dinky-function/src/main/java/org/dinky/function/data/model/UDF.java @@ -22,6 +22,7 @@ import org.apache.flink.table.catalog.FunctionLanguage; import lombok.Builder; +import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; @@ -29,6 +30,7 @@ @Getter @Setter @Builder +@EqualsAndHashCode public class UDF { /** 函数名 */