diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java index fe722a4a90d..c23c2317211 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java @@ -37,6 +37,7 @@ import org.apache.inlong.audit.service.consume.KafkaConsume; import org.apache.inlong.audit.service.consume.PulsarConsume; import org.apache.inlong.audit.service.consume.TubeConsume; +import org.apache.inlong.common.constant.MQType; import org.apache.inlong.common.pojo.audit.AuditConfigRequest; import org.apache.inlong.common.pojo.audit.MQInfo; import org.slf4j.Logger; @@ -45,7 +46,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.List; @@ -70,6 +70,9 @@ public class AuditMsgConsumerServer implements InitializingBean { private static final String DEFAULT_CONFIG_PROPERTIES = "application.properties"; + // interval time of getting mq config + private static final int INTERVAL_MS = 5000; + private final CloseableHttpClient httpClient = HttpClientBuilder.create().build(); private final Gson gson = new Gson(); @@ -83,24 +86,25 @@ public void afterPropertiesSet() { List insertServiceList = this.getInsertServiceList(); for (MQInfo mqInfo : mqInfoList) { - if (mqConfig.isPulsar()) { + if (mqConfig.isPulsar() && MQType.PULSAR.equals(mqInfo.getMqType())) { mqConfig.setPulsarServerUrl(mqInfo.getUrl()); mqConsume = new PulsarConsume(insertServiceList, storeConfig, mqConfig); break; - } else if (mqConfig.isTube()) { + } else if (mqConfig.isTube() && MQType.TUBEMQ.equals(mqInfo.getMqType())) { mqConfig.setTubeMasterList(mqInfo.getUrl()); mqConsume = new TubeConsume(insertServiceList, storeConfig, mqConfig); break; - } else if (mqConfig.isKafka()) { + } else if (mqConfig.isKafka() && MQType.KAFKA.equals(mqInfo.getMqType())) { mqConfig.setKafkaServerUrl(mqInfo.getUrl()); mqConsume = new KafkaConsume(insertServiceList, storeConfig, mqConfig); break; - } else { - LOG.error("Unknown MessageQueue {}", mqConfig.getMqType()); - return; } } + if (mqConsume == null) { + LOG.error("Unknown MessageQueue {}", mqConfig.getMqType()); + } + if (storeConfig.isElasticsearchStore()) { esService.startTimerRoutine(); } @@ -133,19 +137,23 @@ private List getInsertServiceList() { private List getClusterFromManager() { Properties properties = new Properties(); + List mqConfig; try (InputStream inputStream = getClass().getClassLoader().getResourceAsStream(DEFAULT_CONFIG_PROPERTIES)) { properties.load(inputStream); String managerHosts = properties.getProperty("manager.hosts"); String clusterTag = properties.getProperty("proxy.cluster.tag"); String[] hostList = StringUtils.split(managerHosts, ","); for (String host : hostList) { - List mqConfig = getMQConfig(host, clusterTag); - if (ObjectUtils.isNotEmpty(mqConfig)) { - LOG.info("return mqConfig"); - return mqConfig; + while (true) { + mqConfig = getMQConfig(host, clusterTag); + if (ObjectUtils.isNotEmpty(mqConfig)) { + return mqConfig; + } + LOG.info("MQ config may not be registered yet, wait for 5s and try again"); + Thread.sleep(INTERVAL_MS); } } - } catch (IOException e) { + } catch (Exception e) { throw new RuntimeException(e); } return null;