Skip to content

Commit

Permalink
[Optimize][dinky-gateway] The user-defined flink conf path overrides …
Browse files Browse the repository at this point in the history
…the flink conf path parameter. (#4127)

Co-authored-by: yuhang2.zhang <[email protected]>
Co-authored-by: GH Action - Upstream Sync <[email protected]>
  • Loading branch information
3 people authored Jan 8, 2025
1 parent 7df416c commit 06744f1
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 18 deletions.
4 changes: 2 additions & 2 deletions dinky-common/src/main/java/org/dinky/data/enums/Status.java
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,8 @@ public enum Status {
/**
* gateway config
*/
GAETWAY_KUBERNETS_TEST_FAILED(180, "gateway.kubernetes.test.failed"),
GAETWAY_KUBERNETS_TEST_SUCCESS(181, "gateway.kubernetes.test.success"),
GATEWAY_KUBERNETES_TEST_FAILED(180, "gateway.kubernetes.test.failed"),
GATEWAY_KUBERNETES_TEST_SUCCESS(181, "gateway.kubernetes.test.success"),

/**
* process
Expand Down
18 changes: 4 additions & 14 deletions dinky-core/src/main/java/org/dinky/job/JobConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.dinky.gateway.enums.SavePointStrategy;
import org.dinky.gateway.model.FlinkClusterConfig;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.RestOptions;
Expand All @@ -43,14 +42,12 @@
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

/**
* JobConfig
*
* @since 2021/6/27 18:45
*/
@Slf4j
@Data
@Builder
@AllArgsConstructor
Expand Down Expand Up @@ -254,24 +251,17 @@ public ExecutorConfig getExecutorSetting() {

public void buildGatewayConfig(FlinkClusterConfig config) {
FlinkConfig flinkConfig = config.getFlinkConfig();

// Prioritize loading custom Flink configuration content in the cluster configuration
for (CustomConfig customConfig : flinkConfig.getFlinkConfigList()) {
Assert.notNull(customConfig.getName(), "Custom flink config has null key");
Assert.notNull(customConfig.getValue(), "Custom flink config has null value");
flinkConfig.getConfiguration().put(customConfig.getName(), customConfig.getValue());
}

Map<String, String> configuration = flinkConfig.getConfiguration();

// In Kubernetes mode, must set jobmanager.memory.process.size.
if (StringUtils.isBlank(configuration.get("jobmanager.memory.process.size"))) {
log.warn("In Kubernetes mode, please configure 'jobmanager.memory.process.size', default 2048m");
configuration.put("jobmanager.memory.process.size", "2048m");
}

// Load job configuration content afterwards
configuration.putAll(getConfigJson());
configuration.put(CoreOptions.DEFAULT_PARALLELISM.key(), String.valueOf(parallelism));
// Load job configuration content afterwords
flinkConfig.getConfiguration().putAll(getConfigJson());
flinkConfig.getConfiguration().put(CoreOptions.DEFAULT_PARALLELISM.key(), String.valueOf(parallelism));
flinkConfig.setJobName(getJobName());

gatewayConfig = GatewayConfig.build(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
import org.dinky.gateway.result.TestResult;
import org.dinky.utils.TextUtil;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
Expand Down Expand Up @@ -86,6 +88,12 @@ public void init() {

protected void initConfig() {
flinkConfigPath = config.getClusterConfig().getFlinkConfigPath();

// The user-defined flink conf path overrides the flink conf path parameter.
if (StringUtils.isNotBlank(flinkConfigPath)) {
addConfigParas(DeploymentOptionsInternal.CONF_DIR, flinkConfigPath);
}

flinkConfig = config.getFlinkConfig();
String jobName = flinkConfig.getJobName();
if (TextUtil.isEmpty(jobName)) {
Expand Down Expand Up @@ -222,9 +230,9 @@ public TestResult test() {
}
return TestResult.success();
} catch (Exception e) {
logger.error(Status.GAETWAY_KUBERNETS_TEST_FAILED.getMessage(), e);
logger.error(Status.GATEWAY_KUBERNETES_TEST_FAILED.getMessage(), e);
return TestResult.fail(
StrFormatter.format("{}:{}", Status.GAETWAY_KUBERNETS_TEST_FAILED.getMessage(), e.getMessage()));
StrFormatter.format("{}:{}", Status.GATEWAY_KUBERNETES_TEST_FAILED.getMessage(), e.getMessage()));
} finally {
close();
}
Expand Down

0 comments on commit 06744f1

Please sign in to comment.