From 06744f127afb409dc15b430fd86b1f3bd91b5aa4 Mon Sep 17 00:00:00 2001 From: zhangyuhang <2827528315@qq.com> Date: Wed, 8 Jan 2025 17:47:50 +0800 Subject: [PATCH] [Optimize][dinky-gateway] The user-defined flink conf path overrides the flink conf path parameter. (#4127) Co-authored-by: yuhang2.zhang Co-authored-by: GH Action - Upstream Sync --- .../main/java/org/dinky/data/enums/Status.java | 4 ++-- .../src/main/java/org/dinky/job/JobConfig.java | 18 ++++-------------- .../gateway/kubernetes/KubernetesGateway.java | 12 ++++++++++-- 3 files changed, 16 insertions(+), 18 deletions(-) diff --git a/dinky-common/src/main/java/org/dinky/data/enums/Status.java b/dinky-common/src/main/java/org/dinky/data/enums/Status.java index efe2711db5..f68b177cc2 100644 --- a/dinky-common/src/main/java/org/dinky/data/enums/Status.java +++ b/dinky-common/src/main/java/org/dinky/data/enums/Status.java @@ -444,8 +444,8 @@ public enum Status { /** * gateway config */ - GAETWAY_KUBERNETS_TEST_FAILED(180, "gateway.kubernetes.test.failed"), - GAETWAY_KUBERNETS_TEST_SUCCESS(181, "gateway.kubernetes.test.success"), + GATEWAY_KUBERNETES_TEST_FAILED(180, "gateway.kubernetes.test.failed"), + GATEWAY_KUBERNETES_TEST_SUCCESS(181, "gateway.kubernetes.test.success"), /** * process diff --git a/dinky-core/src/main/java/org/dinky/job/JobConfig.java b/dinky-core/src/main/java/org/dinky/job/JobConfig.java index 5e812393e0..a46011458e 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobConfig.java +++ b/dinky-core/src/main/java/org/dinky/job/JobConfig.java @@ -29,7 +29,6 @@ import org.dinky.gateway.enums.SavePointStrategy; import org.dinky.gateway.model.FlinkClusterConfig; -import org.apache.commons.lang3.StringUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.RestOptions; @@ -43,14 +42,12 @@ import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; -import lombok.extern.slf4j.Slf4j; /** * JobConfig * * @since 2021/6/27 18:45 */ -@Slf4j @Data @Builder @AllArgsConstructor @@ -254,6 +251,7 @@ public ExecutorConfig getExecutorSetting() { public void buildGatewayConfig(FlinkClusterConfig config) { FlinkConfig flinkConfig = config.getFlinkConfig(); + // Prioritize loading custom Flink configuration content in the cluster configuration for (CustomConfig customConfig : flinkConfig.getFlinkConfigList()) { Assert.notNull(customConfig.getName(), "Custom flink config has null key"); @@ -261,17 +259,9 @@ public void buildGatewayConfig(FlinkClusterConfig config) { flinkConfig.getConfiguration().put(customConfig.getName(), customConfig.getValue()); } - Map configuration = flinkConfig.getConfiguration(); - - // In Kubernetes mode, must set jobmanager.memory.process.size. - if (StringUtils.isBlank(configuration.get("jobmanager.memory.process.size"))) { - log.warn("In Kubernetes mode, please configure 'jobmanager.memory.process.size', default 2048m"); - configuration.put("jobmanager.memory.process.size", "2048m"); - } - - // Load job configuration content afterwards - configuration.putAll(getConfigJson()); - configuration.put(CoreOptions.DEFAULT_PARALLELISM.key(), String.valueOf(parallelism)); + // Load job configuration content afterwords + flinkConfig.getConfiguration().putAll(getConfigJson()); + flinkConfig.getConfiguration().put(CoreOptions.DEFAULT_PARALLELISM.key(), String.valueOf(parallelism)); flinkConfig.setJobName(getJobName()); gatewayConfig = GatewayConfig.build(config); diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/KubernetesGateway.java b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/KubernetesGateway.java index 47cb9f075c..3b2b5ee42b 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/KubernetesGateway.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/KubernetesGateway.java @@ -31,9 +31,11 @@ import org.dinky.gateway.result.TestResult; import org.dinky.utils.TextUtil; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.configuration.DeploymentOptionsInternal; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.kubernetes.KubernetesClusterClientFactory; @@ -86,6 +88,12 @@ public void init() { protected void initConfig() { flinkConfigPath = config.getClusterConfig().getFlinkConfigPath(); + + // The user-defined flink conf path overrides the flink conf path parameter. + if (StringUtils.isNotBlank(flinkConfigPath)) { + addConfigParas(DeploymentOptionsInternal.CONF_DIR, flinkConfigPath); + } + flinkConfig = config.getFlinkConfig(); String jobName = flinkConfig.getJobName(); if (TextUtil.isEmpty(jobName)) { @@ -222,9 +230,9 @@ public TestResult test() { } return TestResult.success(); } catch (Exception e) { - logger.error(Status.GAETWAY_KUBERNETS_TEST_FAILED.getMessage(), e); + logger.error(Status.GATEWAY_KUBERNETES_TEST_FAILED.getMessage(), e); return TestResult.fail( - StrFormatter.format("{}:{}", Status.GAETWAY_KUBERNETS_TEST_FAILED.getMessage(), e.getMessage())); + StrFormatter.format("{}:{}", Status.GATEWAY_KUBERNETES_TEST_FAILED.getMessage(), e.getMessage())); } finally { close(); }