Skip to content

Commit

Permalink
[INLONG-10604][Sort] Fix NPE when unified configuration is not exits (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng committed Jul 11, 2024
1 parent 903a4e9 commit 28c0727
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,13 @@ public void reload() {
return;
}
LOG.info("get new SortTaskConfig:taskName:{}", taskName);
ClsNodeConfig requestNodeConfig = (ClsNodeConfig) newTaskConfig.getNodeConfig();
if (clsNodeConfig == null || requestNodeConfig.getVersion() > clsNodeConfig.getVersion()) {
this.clsNodeConfig = requestNodeConfig;
if (newTaskConfig != null) {
ClsNodeConfig requestNodeConfig = (ClsNodeConfig) newTaskConfig.getNodeConfig();
if (clsNodeConfig == null || requestNodeConfig.getVersion() > clsNodeConfig.getVersion()) {
this.clsNodeConfig = requestNodeConfig;
}
}

this.taskConfig = newTaskConfig;
this.sortTaskConfig = newSortTaskConfig;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,13 @@ public void reload() {
}
LOG.info("get new SortTaskConfig:taskName:{}", taskName);

EsNodeConfig requestNodeConfig = (EsNodeConfig) newTaskConfig.getNodeConfig();
if (esNodeConfig == null || requestNodeConfig.getVersion() > esNodeConfig.getVersion()) {
this.esNodeConfig = requestNodeConfig;
if (newTaskConfig != null) {
EsNodeConfig requestNodeConfig = (EsNodeConfig) newTaskConfig.getNodeConfig();
if (esNodeConfig == null || requestNodeConfig.getVersion() > esNodeConfig.getVersion()) {
this.esNodeConfig = requestNodeConfig;
}
}

this.taskConfig = newTaskConfig;
this.sortTaskConfig = newSortTaskConfig;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,12 @@ public void reload() {
LOG.info("Same sortTaskConfig, do nothing.");
return;
}
KafkaNodeConfig requestNodeConfig = (KafkaNodeConfig) newTaskConfig.getNodeConfig();
if (kafkaNodeConfig == null || requestNodeConfig.getVersion() > kafkaNodeConfig.getVersion()) {
this.kafkaNodeConfig = requestNodeConfig;

if (newTaskConfig != null) {
KafkaNodeConfig requestNodeConfig = (KafkaNodeConfig) newTaskConfig.getNodeConfig();
if (kafkaNodeConfig == null || requestNodeConfig.getVersion() > kafkaNodeConfig.getVersion()) {
this.kafkaNodeConfig = requestNodeConfig;
}
}

this.taskConfig = newTaskConfig;
Expand All @@ -99,6 +102,9 @@ public void reload() {
}

public Map<String, KafkaIdConfig> fromTaskConfig(TaskConfig taskConfig) {
if (taskConfig == null) {
return new HashMap<>();
}
return taskConfig.getClusterTagConfigs()
.stream()
.map(ClusterTagConfig::getDataFlowConfigs)
Expand All @@ -111,6 +117,10 @@ public Map<String, KafkaIdConfig> fromTaskConfig(TaskConfig taskConfig) {
}

public Map<String, KafkaIdConfig> fromSortTaskConfig(SortTaskConfig sortTaskConfig) {
if (sortTaskConfig == null) {
return new HashMap<>();
}

List<Map<String, String>> idList = sortTaskConfig.getIdParams();
Map<String, KafkaIdConfig> newIdConfigMap = new ConcurrentHashMap<>();
for (Map<String, String> idParam : idList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,11 @@ public void reload() {
return;
}

PulsarNodeConfig requestNodeConfig = (PulsarNodeConfig) newTaskConfig.getNodeConfig();
if (pulsarNodeConfig == null || requestNodeConfig.getVersion() > pulsarNodeConfig.getVersion()) {
this.pulsarNodeConfig = requestNodeConfig;
if (newTaskConfig != null) {
PulsarNodeConfig requestNodeConfig = (PulsarNodeConfig) newTaskConfig.getNodeConfig();
if (pulsarNodeConfig == null || requestNodeConfig.getVersion() > pulsarNodeConfig.getVersion()) {
this.pulsarNodeConfig = requestNodeConfig;
}
}

this.taskConfig = newTaskConfig;
Expand All @@ -94,6 +96,9 @@ public void reload() {
}

public Map<String, PulsarIdConfig> fromTaskConfig(TaskConfig taskConfig) {
if (taskConfig == null) {
return new HashMap<>();
}
return taskConfig.getClusterTagConfigs()
.stream()
.map(ClusterTagConfig::getDataFlowConfigs)
Expand All @@ -106,6 +111,9 @@ public Map<String, PulsarIdConfig> fromTaskConfig(TaskConfig taskConfig) {
}

public Map<String, PulsarIdConfig> fromSortTaskConfig(SortTaskConfig sortTaskConfig) {
if (sortTaskConfig == null) {
return new HashMap<>();
}
Map<String, PulsarIdConfig> newIdConfigMap = new ConcurrentHashMap<>();
List<Map<String, String>> idList = sortTaskConfig.getIdParams();
for (Map<String, String> idParam : idList) {
Expand Down

0 comments on commit 28c0727

Please sign in to comment.