diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java index 2c58cbf7acc..f53cc2fa91e 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java @@ -109,10 +109,13 @@ public void reload() { return; } LOG.info("get new SortTaskConfig:taskName:{}", taskName); - ClsNodeConfig requestNodeConfig = (ClsNodeConfig) newTaskConfig.getNodeConfig(); - if (clsNodeConfig == null || requestNodeConfig.getVersion() > clsNodeConfig.getVersion()) { - this.clsNodeConfig = requestNodeConfig; + if (newTaskConfig != null) { + ClsNodeConfig requestNodeConfig = (ClsNodeConfig) newTaskConfig.getNodeConfig(); + if (clsNodeConfig == null || requestNodeConfig.getVersion() > clsNodeConfig.getVersion()) { + this.clsNodeConfig = requestNodeConfig; + } } + this.taskConfig = newTaskConfig; this.sortTaskConfig = newSortTaskConfig; diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java index 9da300057bc..f1f0cccf88f 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java @@ -143,10 +143,13 @@ public void reload() { } LOG.info("get new SortTaskConfig:taskName:{}", taskName); - EsNodeConfig requestNodeConfig = (EsNodeConfig) newTaskConfig.getNodeConfig(); - if (esNodeConfig == null || requestNodeConfig.getVersion() > esNodeConfig.getVersion()) { - this.esNodeConfig = requestNodeConfig; + if (newTaskConfig != null) { + EsNodeConfig requestNodeConfig = (EsNodeConfig) newTaskConfig.getNodeConfig(); + if (esNodeConfig == null || requestNodeConfig.getVersion() > esNodeConfig.getVersion()) { + this.esNodeConfig = requestNodeConfig; + } } + this.taskConfig = newTaskConfig; this.sortTaskConfig = newSortTaskConfig; diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java index f1dbdeb5283..8fe9dc58333 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java @@ -76,9 +76,12 @@ public void reload() { LOG.info("Same sortTaskConfig, do nothing."); return; } - KafkaNodeConfig requestNodeConfig = (KafkaNodeConfig) newTaskConfig.getNodeConfig(); - if (kafkaNodeConfig == null || requestNodeConfig.getVersion() > kafkaNodeConfig.getVersion()) { - this.kafkaNodeConfig = requestNodeConfig; + + if (newTaskConfig != null) { + KafkaNodeConfig requestNodeConfig = (KafkaNodeConfig) newTaskConfig.getNodeConfig(); + if (kafkaNodeConfig == null || requestNodeConfig.getVersion() > kafkaNodeConfig.getVersion()) { + this.kafkaNodeConfig = requestNodeConfig; + } } this.taskConfig = newTaskConfig; @@ -99,6 +102,9 @@ public void reload() { } public Map fromTaskConfig(TaskConfig taskConfig) { + if (taskConfig == null) { + return new HashMap<>(); + } return taskConfig.getClusterTagConfigs() .stream() .map(ClusterTagConfig::getDataFlowConfigs) @@ -111,6 +117,10 @@ public Map fromTaskConfig(TaskConfig taskConfig) { } public Map fromSortTaskConfig(SortTaskConfig sortTaskConfig) { + if (sortTaskConfig == null) { + return new HashMap<>(); + } + List> idList = sortTaskConfig.getIdParams(); Map newIdConfigMap = new ConcurrentHashMap<>(); for (Map idParam : idList) { diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java index a686d50214e..74284e3210b 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java @@ -71,9 +71,11 @@ public void reload() { return; } - PulsarNodeConfig requestNodeConfig = (PulsarNodeConfig) newTaskConfig.getNodeConfig(); - if (pulsarNodeConfig == null || requestNodeConfig.getVersion() > pulsarNodeConfig.getVersion()) { - this.pulsarNodeConfig = requestNodeConfig; + if (newTaskConfig != null) { + PulsarNodeConfig requestNodeConfig = (PulsarNodeConfig) newTaskConfig.getNodeConfig(); + if (pulsarNodeConfig == null || requestNodeConfig.getVersion() > pulsarNodeConfig.getVersion()) { + this.pulsarNodeConfig = requestNodeConfig; + } } this.taskConfig = newTaskConfig; @@ -94,6 +96,9 @@ public void reload() { } public Map fromTaskConfig(TaskConfig taskConfig) { + if (taskConfig == null) { + return new HashMap<>(); + } return taskConfig.getClusterTagConfigs() .stream() .map(ClusterTagConfig::getDataFlowConfigs) @@ -106,6 +111,9 @@ public Map fromTaskConfig(TaskConfig taskConfig) { } public Map fromSortTaskConfig(SortTaskConfig sortTaskConfig) { + if (sortTaskConfig == null) { + return new HashMap<>(); + } Map newIdConfigMap = new ConcurrentHashMap<>(); List> idList = sortTaskConfig.getIdParams(); for (Map idParam : idList) {