From 375df0c78fa72608fbce6b5afccf53c0888197e6 Mon Sep 17 00:00:00 2001 From: ZackYoung Date: Thu, 23 May 2024 16:59:45 +0800 Subject: [PATCH] [BugFix] [UDF] Fix the issue where UDF cannot find classes in Flink SQL tasks (#3523) --- .../java/org/dinky/job/builder/JobUDFBuilder.java | 10 +++++++++- .../dinky/function/compiler/FunctionCompiler.java | 11 +++++++++++ .../org/dinky/function/compiler/JavaCompiler.java | 14 +++----------- .../main/java/org/dinky/function/util/UDFUtil.java | 2 +- 4 files changed, 24 insertions(+), 13 deletions(-) diff --git a/dinky-core/src/main/java/org/dinky/job/builder/JobUDFBuilder.java b/dinky-core/src/main/java/org/dinky/job/builder/JobUDFBuilder.java index ee1a1dca5b..3746abd6ad 100644 --- a/dinky-core/src/main/java/org/dinky/job/builder/JobUDFBuilder.java +++ b/dinky-core/src/main/java/org/dinky/job/builder/JobUDFBuilder.java @@ -70,10 +70,10 @@ public void run() throws Exception { List jarFiles = new ArrayList<>(jobManager.getUdfPathContextHolder().getAllFileSet()); + String[] userCustomUdfJarPath = UDFUtil.initJavaUDF(udfList, taskId); String[] jarPaths = CollUtil.removeNull(jarFiles).stream() .map(File::getAbsolutePath) .toArray(String[]::new); - if (GATEWAY_TYPE_MAP.get(SESSION).contains(runMode)) { config.setJarFiles(jarPaths); } @@ -82,6 +82,7 @@ public void run() throws Exception { String[] pyPaths = UDFUtil.initPythonUDF( udfList, runMode, config.getTaskId(), executor.getTableConfig().getConfiguration()); + executor.initUDF(userCustomUdfJarPath); executor.initUDF(jarPaths); if (ArrayUtil.isNotEmpty(pyPaths)) { @@ -91,6 +92,13 @@ public void run() throws Exception { } } } + if (ArrayUtil.isNotEmpty(userCustomUdfJarPath)) { + for (String jarPath : userCustomUdfJarPath) { + if (StrUtil.isNotBlank(jarPath)) { + jobManager.getUdfPathContextHolder().addUdfPath(new File(jarPath)); + } + } + } Set pyUdfFile = jobManager.getUdfPathContextHolder().getPyUdfFile(); executor.initPyUDF( diff --git a/dinky-function/src/main/java/org/dinky/function/compiler/FunctionCompiler.java b/dinky-function/src/main/java/org/dinky/function/compiler/FunctionCompiler.java index d00ba28dd5..42b1e6c6d3 100644 --- a/dinky-function/src/main/java/org/dinky/function/compiler/FunctionCompiler.java +++ b/dinky-function/src/main/java/org/dinky/function/compiler/FunctionCompiler.java @@ -25,13 +25,16 @@ import org.apache.flink.configuration.ReadableConfig; +import java.util.HashSet; import java.util.List; +import java.util.Set; import cn.hutool.core.lang.Singleton; import cn.hutool.core.util.StrUtil; /** @since 0.6.8 */ public interface FunctionCompiler { + Set COMPILER_CACHE = new HashSet<>(); /** * 函数代码在线动态编译 @@ -54,6 +57,11 @@ public interface FunctionCompiler { static boolean getCompiler(UDF udf, ReadableConfig conf, Integer missionId) { Asserts.checkNull(udf, "udf为空"); Asserts.checkNull(udf.getCode(), "udf 代码为空"); + + String key = udf.getClassName() + udf.getFunctionLanguage(); + if (COMPILER_CACHE.contains(key)) { + return true; + } boolean success; switch (udf.getFunctionLanguage()) { case JAVA: @@ -69,6 +77,9 @@ static boolean getCompiler(UDF udf, ReadableConfig conf, Integer missionId) { throw UDFCompilerException.notSupportedException( udf.getFunctionLanguage().name()); } + if (success) { + COMPILER_CACHE.add(key); + } return success; } diff --git a/dinky-function/src/main/java/org/dinky/function/compiler/JavaCompiler.java b/dinky-function/src/main/java/org/dinky/function/compiler/JavaCompiler.java index 56fc310d78..2394ecfde1 100644 --- a/dinky-function/src/main/java/org/dinky/function/compiler/JavaCompiler.java +++ b/dinky-function/src/main/java/org/dinky/function/compiler/JavaCompiler.java @@ -24,9 +24,6 @@ import org.apache.flink.configuration.ReadableConfig; -import java.util.HashSet; -import java.util.Set; - import lombok.extern.slf4j.Slf4j; /** @@ -34,7 +31,6 @@ */ @Slf4j public class JavaCompiler implements FunctionCompiler { - private static final Set COMPILER_CACHE = new HashSet<>(); /** * 函数代码在线动态编译 @@ -46,22 +42,18 @@ public class JavaCompiler implements FunctionCompiler { */ @Override public synchronized boolean compiler(UDF udf, ReadableConfig conf, Integer missionId) { - String key = udf.getClassName() + udf.getFunctionLanguage(); - if (COMPILER_CACHE.contains(key)) { - return true; - } + // TODO 改为ProcessStep注释 log.info("Compiling java code, class: {}", udf.getClassName()); CustomStringJavaCompiler compiler = new CustomStringJavaCompiler(udf.getCode()); boolean res = compiler.compilerToTmpPath(PathConstant.getUdfCompilerJavaPath(missionId)); String className = compiler.getFullClassName(); if (res) { - log.info("class编译成功:{}", className); + log.info("class compiled successfully:{}", className); log.info("compiler take time:{}", compiler.getCompilerTakeTime()); - COMPILER_CACHE.add(key); return true; } else { - log.error("class编译失败:{}", className); + log.error("class compilation failed:{}", className); log.error(compiler.getCompilerMessage()); return false; } diff --git a/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java b/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java index 427b1dec2f..aac5ff08d8 100644 --- a/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java +++ b/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java @@ -154,7 +154,7 @@ public static String templateParse(String dialect, String template, String class } } - public static String[] initJavaUDF(List udf, GatewayType gatewayType, Integer missionId) { + public static String[] initJavaUDF(List udf, Integer missionId) { return FunctionFactory.initUDF( CollUtil.newArrayList( CollUtil.filterNew(udf, x -> x.getFunctionLanguage() != FunctionLanguage.PYTHON)),