From e4d5007fc56240f41f9233b440c6cde75942c69f Mon Sep 17 00:00:00 2001 From: fuweng11 <76141879+fuweng11@users.noreply.github.com> Date: Thu, 11 Jul 2024 19:19:19 +0800 Subject: [PATCH] [INLONG-10601][Manager] Optimize the agent task configuration process (#10602) --- .../common/pojo/agent/AgentConfigInfo.java | 1 + .../inlong/common/pojo/agent/TaskResult.java | 1 + .../dao/entity/AgentTaskConfigEntity.java | 47 +++ .../mapper/AgentTaskConfigEntityMapper.java | 45 +++ .../mappers/AgentTaskConfigEntityMapper.xml | 100 +++++++ ...ortConfigLoader.java => ConfigLoader.java} | 10 +- .../service/core/impl/AgentServiceImpl.java | 136 ++++----- ...gLoaderImpl.java => ConfigLoaderImpl.java} | 17 +- .../core/impl/SortClusterServiceImpl.java | 16 +- .../service/core/impl/SortServiceImpl.java | 6 +- .../core/impl/SortSourceServiceImpl.java | 6 +- .../listener/StreamTaskListenerFactory.java | 4 + .../listener/source/SourceStartListener.java | 86 ++++++ .../repository/DataProxyConfigRepository.java | 8 +- .../source/AbstractSourceOperator.java | 275 ++++++++++++++++++ .../service/source/StreamSourceOperator.java | 8 + .../service/stream/TemplateServiceImpl.java | 4 +- .../resources/h2/apache_inlong_manager.sql | 20 ++ .../manager-web/sql/apache_inlong_manager.sql | 21 ++ .../manager-web/sql/changes-1.13.0.sql | 21 ++ 20 files changed, 732 insertions(+), 100 deletions(-) create mode 100644 inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/AgentTaskConfigEntity.java create mode 100644 inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AgentTaskConfigEntityMapper.java create mode 100644 inlong-manager/manager-dao/src/main/resources/mappers/AgentTaskConfigEntityMapper.xml rename inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/{SortConfigLoader.java => ConfigLoader.java} (93%) rename inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/{SortConfigLoaderImpl.java => ConfigLoaderImpl.java} (90%) create mode 100644 inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStartListener.java diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigInfo.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigInfo.java index 2399c9657c6..ff8cd4df3ed 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigInfo.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigInfo.java @@ -34,6 +34,7 @@ public class AgentConfigInfo { AgentResponseCode code; private String zkUrl; private AgentClusterInfo cluster; + private Integer version; private String md5; @Data diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java index 2fcbec919d1..7970e345cb9 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java @@ -36,6 +36,7 @@ public class TaskResult { private List cmdConfigs; private List dataConfigs; private String md5; + private Integer version; AgentResponseCode code; } \ No newline at end of file diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/AgentTaskConfigEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/AgentTaskConfigEntity.java new file mode 100644 index 00000000000..cc8c3ab529b --- /dev/null +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/AgentTaskConfigEntity.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.dao.entity; + +import lombok.Data; + +import java.io.Serializable; +import java.util.Date; + +/** + * Agent task config entity, including agent ip, cluster name, etc. + */ +@Data +public class AgentTaskConfigEntity implements Serializable { + + private static final long serialVersionUID = 1L; + private Integer id; + private String clusterName; + private String agentIp; + + private String configParams; + + private String taskParams; + + private Integer isDeleted; + private String creator; + private String modifier; + private Date createTime; + private Date modifyTime; + private Integer version; + +} diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AgentTaskConfigEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AgentTaskConfigEntityMapper.java new file mode 100644 index 00000000000..d673a16eac7 --- /dev/null +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AgentTaskConfigEntityMapper.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.dao.mapper; + +import org.apache.inlong.manager.common.tenant.MultiTenantQuery; +import org.apache.inlong.manager.dao.entity.AgentTaskConfigEntity; + +import org.apache.ibatis.annotations.Options; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.cursor.Cursor; +import org.apache.ibatis.mapping.ResultSetType; +import org.springframework.stereotype.Repository; + +@Repository +public interface AgentTaskConfigEntityMapper { + + int insert(AgentTaskConfigEntity record); + + AgentTaskConfigEntity selectByPrimaryKey(Integer id); + + AgentTaskConfigEntity selectByIdentifier(@Param("agentIp") String agentIp, + @Param("clusterName") String clusterName); + + int updateByIdSelective(AgentTaskConfigEntity record); + + @MultiTenantQuery(with = false) + @Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize = Integer.MIN_VALUE) + Cursor selectAllAgentTaskConfigs(); + +} diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/AgentTaskConfigEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/AgentTaskConfigEntityMapper.xml new file mode 100644 index 00000000000..9a5f237c8d9 --- /dev/null +++ b/inlong-manager/manager-dao/src/main/resources/mappers/AgentTaskConfigEntityMapper.xml @@ -0,0 +1,100 @@ + + + + + + + + + + + + + + + + + + + + + id, agent_ip, cluster_name, config_params, task_params, is_deleted, creator, modifier, create_time, modify_time, version + + + insert into agent_task_config (id, agent_ip, cluster_name, + config_params, task_params, + creator, modifier) + values (#{id, jdbcType=INTEGER}, #{agentIp, jdbcType=VARCHAR}, #{clusterName, jdbcType=VARCHAR}, + #{configParams, jdbcType=VARCHAR}, #{taskParams, jdbcType=VARCHAR}, + #{creator, jdbcType=VARCHAR}, #{modifier, jdbcType=VARCHAR}) + + + + + + + update agent_task_config + + + agent_ip = #{agentIp,jdbcType=VARCHAR}, + + + cluster_name = #{clusterName,jdbcType=VARCHAR}, + + + config_params = #{configParams,jdbcType=VARCHAR}, + + + task_params = #{taskParams,jdbcType=VARCHAR}, + + + is_deleted = #{isDeleted,jdbcType=INTEGER}, + + + modifier = #{modifier,jdbcType=VARCHAR}, + + version = #{version,jdbcType=INTEGER} + 1 + + where id = #{id,jdbcType=INTEGER} + and version = #{version,jdbcType=INTEGER} + + diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConfigLoader.java similarity index 93% rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConfigLoader.java index 2923e6f34da..a491838c685 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConfigLoader.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.service.core; +import org.apache.inlong.manager.dao.entity.AgentTaskConfigEntity; import org.apache.inlong.manager.dao.entity.ClusterConfigEntity; import org.apache.inlong.manager.dao.entity.DataNodeEntity; import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity; @@ -35,7 +36,7 @@ /** * Loader for sort service to load configs thought Cursor */ -public interface SortConfigLoader { +public interface ConfigLoader { /** * Load all clusters by cursor @@ -124,4 +125,11 @@ public interface SortConfigLoader { */ List loadAllClusterConfigEntity(); + /** + * Load all agent task config info + * + * @return List of agent task config info + */ + List loadAllAgentTaskConfigEntity(); + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java index 37a19beb14d..f7d669151a3 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java @@ -47,6 +47,7 @@ import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; import org.apache.inlong.manager.common.util.Preconditions; +import org.apache.inlong.manager.dao.entity.AgentTaskConfigEntity; import org.apache.inlong.manager.dao.entity.InlongClusterEntity; import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity; import org.apache.inlong.manager.dao.entity.InlongGroupEntity; @@ -63,7 +64,6 @@ import org.apache.inlong.manager.dao.mapper.PackageConfigEntityMapper; import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper; import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest; -import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterInfo; import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeBindGroupRequest; import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeDTO; import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO; @@ -73,6 +73,7 @@ import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.service.cluster.InlongClusterService; import org.apache.inlong.manager.service.core.AgentService; +import org.apache.inlong.manager.service.core.ConfigLoader; import org.apache.inlong.manager.service.source.SourceOperatorFactory; import org.apache.inlong.manager.service.source.SourceSnapshotOperator; import org.apache.inlong.manager.service.source.StreamSourceOperator; @@ -107,9 +108,11 @@ import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -144,10 +147,8 @@ public class AgentServiceImpl implements AgentService { new ThreadFactoryBuilder().setNameFormat("async-agent-%s").build(), new CallerRunsPolicy()); - @Getter - private LoadingCache taskCache; - @Getter - private LoadingCache agentConfigCache; + private Map taskConfigMap = new ConcurrentHashMap<>(); + private Map agentConfigMap = new ConcurrentHashMap<>(); @Getter private LoadingCache moduleConfigCache; @@ -191,6 +192,8 @@ public class AgentServiceImpl implements AgentService { private PackageConfigEntityMapper packageConfigEntityMapper; @Autowired private InlongClusterService clusterService; + @Autowired + private ConfigLoader configLoader; /** * Start the update task @@ -201,12 +204,6 @@ private void startHeartbeatTask() { // The expiry time of cluster info cache must be greater than taskCache cache // because the eviction handler needs to query cluster info cache long expireTime = 10 * 5; - taskCache = Caffeine.newBuilder() - .expireAfterWrite(expireTime * 2L, TimeUnit.SECONDS) - .build(this::fetchTask); - agentConfigCache = Caffeine.newBuilder() - .expireAfterWrite(expireTime * 2L, TimeUnit.SECONDS) - .build(this::fetchAgentConfig); LOGGER.debug("start to reload config for installer."); try { moduleConfigCache = Caffeine.newBuilder() @@ -215,6 +212,12 @@ private void startHeartbeatTask() { } catch (Throwable t) { LOGGER.error("fail to reload all config for installer ", t); } + try { + reload(); + setReloadTimer(); + } catch (Exception e) { + LOGGER.error("load agent task config failed", e); + } LOGGER.debug("end to reload config for installer"); if (updateTaskTimeoutEnabled) { ThreadFactory factory = new ThreadFactoryBuilder() @@ -266,6 +269,11 @@ private void startHeartbeatTask() { } } + private void setReloadTimer() { + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + executorService.scheduleWithFixedDelay(this::reload, 60000L, 60000L, TimeUnit.MILLISECONDS); + } + @Override public Boolean reportSnapshot(TaskSnapshotRequest request) { return snapshotOperator.snapshot(request); @@ -293,6 +301,42 @@ public void report(TaskRequest request) { } } + public void reload() { + LOGGER.debug("start to reload agent task config."); + try { + Map newTaskConfigMap = new ConcurrentHashMap<>(); + Map newAgentConfigMap = new ConcurrentHashMap<>(); + List agentTaskConfigEntityList = configLoader.loadAllAgentTaskConfigEntity(); + agentTaskConfigEntityList.forEach(agentTaskConfigEntity -> { + try { + String key = agentTaskConfigEntity.getAgentIp() + InlongConstants.UNDERSCORE + + agentTaskConfigEntity.getClusterName(); + TaskResult taskResult = JsonUtils.parseObject(agentTaskConfigEntity.getTaskParams(), + TaskResult.class); + if (taskResult != null) { + taskResult.setVersion(agentTaskConfigEntity.getVersion()); + newTaskConfigMap.putIfAbsent(key, taskResult); + } + AgentConfigInfo agentConfigInfo = JsonUtils.parseObject(agentTaskConfigEntity.getConfigParams(), + AgentConfigInfo.class); + if (agentConfigInfo != null) { + agentConfigInfo.setVersion(agentTaskConfigEntity.getVersion()); + newAgentConfigMap.putIfAbsent(key, agentConfigInfo); + } + } catch (Exception e) { + LOGGER.error("failed to get agent task config for agent ip={}, cluster name={}", + agentTaskConfigEntity.getAgentIp(), agentTaskConfigEntity.getClusterName()); + } + + }); + taskConfigMap = newTaskConfigMap; + agentConfigMap = newAgentConfigMap; + } catch (Throwable t) { + LOGGER.error("failed to reload all agent task config", t); + } + LOGGER.debug("end to reload agent task config"); + } + /** * Update task status by command. * @@ -337,7 +381,8 @@ private void updateTaskStatus(CommandEntity command) { @Override public AgentConfigInfo getAgentConfig(AgentConfigRequest request) { LOGGER.debug("begin to get agent config info for {}", request); - AgentConfigInfo agentConfigInfo = agentConfigCache.get(request); + String key = request.getIp() + InlongConstants.UNDERSCORE + request.getClusterName(); + AgentConfigInfo agentConfigInfo = agentConfigMap.get(key); if (agentConfigInfo == null) { return null; } @@ -370,7 +415,8 @@ public TaskResult getTaskResult(TaskRequest request) { @Override public TaskResult getExistTaskConfig(TaskRequest request) { LOGGER.debug("begin to get all exist task by request={}", request); - TaskResult taskResult = taskCache.get(request); + String key = request.getAgentIp() + InlongConstants.UNDERSCORE + request.getClusterName(); + TaskResult taskResult = taskConfigMap.get(key); if (taskResult == null) { return null; } @@ -825,70 +871,6 @@ private boolean matchGroup(StreamSourceEntity sourceEntity, InlongClusterNodeEnt return sourceGroups.stream().anyMatch(clusterNodeGroups::contains); } - private TaskResult fetchTask(TaskRequest request) { - final String clusterName = request.getClusterName(); - final String ip = request.getAgentIp(); - final String uuid = request.getUuid(); - List normalSourceEntities = sourceMapper.selectByStatusAndCluster( - SourceStatus.NORMAL_STATUS_SET.stream().map(SourceStatus::getCode).collect(Collectors.toList()), - clusterName, ip, uuid); - List taskLists = new ArrayList<>(normalSourceEntities); - List stopSourceEntities = sourceMapper.selectByStatusAndCluster( - SourceStatus.STOP_STATUS_SET.stream().map(SourceStatus::getCode).collect(Collectors.toList()), - clusterName, ip, uuid); - taskLists.addAll(stopSourceEntities); - LOGGER.debug("success to add task : {}", taskLists.size()); - List runningTaskConfig = Lists.newArrayList(); - try { - List cmdConfigs = getAgentCmdConfigs(request); - if (CollectionUtils.isEmpty(taskLists)) { - return TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build(); - } - for (StreamSourceEntity sourceEntity : taskLists) { - int op = getOp(sourceEntity.getStatus()); - DataConfig dataConfig = getDataConfig(sourceEntity, op); - runningTaskConfig.add(dataConfig); - } - TaskResult taskResult = TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build(); - String md5 = DigestUtils.md5Hex(GSON.toJson(taskResult)); - taskResult.setMd5(md5); - taskResult.setCode(AgentResponseCode.SUCCESS); - return taskResult; - } catch (Exception e) { - LOGGER.error("get all exist task failed:", e); - throw new BusinessException("get all exist task failed:" + e.getMessage()); - } - } - - private AgentConfigInfo fetchAgentConfig(AgentConfigRequest request) { - LOGGER.debug("begin to get agent config info for {}", request); - AgentConfigInfo agentConfigInfo = new AgentConfigInfo(); - Set tagSet = new HashSet<>(16); - tagSet.addAll(Arrays.asList(request.getClusterTag().split(InlongConstants.COMMA))); - List clusterTagList = new ArrayList<>(tagSet); - ClusterPageRequest pageRequest = ClusterPageRequest.builder() - .type(ClusterType.AGENT_ZK) - .clusterTagList(clusterTagList) - .build(); - List agentZkCluster = clusterMapper.selectByCondition(pageRequest); - if (CollectionUtils.isNotEmpty(agentZkCluster)) { - agentConfigInfo.setZkUrl(agentZkCluster.get(0).getUrl()); - } - - AgentClusterInfo clusterInfo = (AgentClusterInfo) clusterService.getOne( - null, request.getClusterName(), ClusterType.AGENT); - agentConfigInfo.setCluster(AgentConfigInfo.AgentClusterInfo.builder() - .parentId(clusterInfo.getId()) - .clusterName(clusterInfo.getName()) - .build()); - String jsonStr = GSON.toJson(agentConfigInfo); - String configMd5 = DigestUtils.md5Hex(jsonStr); - agentConfigInfo.setMd5(configMd5); - agentConfigInfo.setCode(AgentResponseCode.SUCCESS); - LOGGER.debug("success to get agent config info for: {}, result: {}", request, agentConfigInfo); - return agentConfigInfo; - } - private ConfigResult loadModuleConfigs(ConfigRequest request) { final String clusterName = request.getClusterName(); final String ip = request.getLocalIp(); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortConfigLoaderImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConfigLoaderImpl.java similarity index 90% rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortConfigLoaderImpl.java rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConfigLoaderImpl.java index 03b5f24977d..649586bb216 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortConfigLoaderImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConfigLoaderImpl.java @@ -17,12 +17,14 @@ package org.apache.inlong.manager.service.core.impl; +import org.apache.inlong.manager.dao.entity.AgentTaskConfigEntity; import org.apache.inlong.manager.dao.entity.ClusterConfigEntity; import org.apache.inlong.manager.dao.entity.DataNodeEntity; import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity; import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity; import org.apache.inlong.manager.dao.entity.SortConfigEntity; import org.apache.inlong.manager.dao.entity.StreamSinkEntity; +import org.apache.inlong.manager.dao.mapper.AgentTaskConfigEntityMapper; import org.apache.inlong.manager.dao.mapper.ClusterConfigEntityMapper; import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper; import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper; @@ -39,7 +41,7 @@ import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamInfo; import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamSinkInfo; import org.apache.inlong.manager.pojo.sort.standalone.SortTaskInfo; -import org.apache.inlong.manager.service.core.SortConfigLoader; +import org.apache.inlong.manager.service.core.ConfigLoader; import org.apache.ibatis.cursor.Cursor; import org.springframework.beans.factory.annotation.Autowired; @@ -50,7 +52,7 @@ import java.util.List; @Service -public class SortConfigLoaderImpl implements SortConfigLoader { +public class ConfigLoaderImpl implements ConfigLoader { @Autowired private InlongClusterEntityMapper clusterEntityMapper; @@ -72,6 +74,8 @@ public class SortConfigLoaderImpl implements SortConfigLoader { private SortConfigEntityMapper sortConfigEntityMapper; @Autowired private ClusterConfigEntityMapper clusterConfigEntityMapper; + @Autowired + private AgentTaskConfigEntityMapper agentTaskConfigEntityMapper; @Transactional @Override @@ -180,4 +184,13 @@ public List loadAllClusterConfigEntity() { cursor.forEach(allClusterConfigs::add); return allClusterConfigs; } + + @Transactional + @Override + public List loadAllAgentTaskConfigEntity() { + Cursor cursor = agentTaskConfigEntityMapper.selectAllAgentTaskConfigs(); + List agentTaskConfigEntityList = new ArrayList<>(); + cursor.forEach(agentTaskConfigEntityList::add); + return agentTaskConfigEntityList; + } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java index 24ee49f7576..36328007e15 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java @@ -26,8 +26,8 @@ import org.apache.inlong.manager.pojo.sort.standalone.SortFieldInfo; import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamInfo; import org.apache.inlong.manager.pojo.sort.standalone.SortTaskInfo; +import org.apache.inlong.manager.service.core.ConfigLoader; import org.apache.inlong.manager.service.core.SortClusterService; -import org.apache.inlong.manager.service.core.SortConfigLoader; import org.apache.inlong.manager.service.node.DataNodeOperator; import org.apache.inlong.manager.service.node.DataNodeOperatorFactory; import org.apache.inlong.manager.service.sink.SinkOperatorFactory; @@ -93,7 +93,7 @@ public class SortClusterServiceImpl implements SortClusterService { private long reloadInterval; @Autowired - private SortConfigLoader sortConfigLoader; + private ConfigLoader configLoader; @Autowired private SinkOperatorFactory sinkOperatorFactory; @Autowired @@ -171,16 +171,16 @@ public SortClusterResponse getClusterConfig(String clusterName, String md5) { private void reloadAllClusterConfig() { // load all fields info - List fieldInfos = sortConfigLoader.loadAllFields(); + List fieldInfos = configLoader.loadAllFields(); fieldMap = new HashMap<>(); fieldInfos.forEach(info -> { List fields = fieldMap.computeIfAbsent(info.getSinkId(), k -> new ArrayList<>()); fields.add(info.getFieldName()); }); - List sinkEntities = sortConfigLoader.loadAllStreamSinkEntity(); + List sinkEntities = configLoader.loadAllStreamSinkEntity(); // get all task under a given cluster, has been reduced into cluster and task. - List tasks = sortConfigLoader.loadAllTask(); + List tasks = configLoader.loadAllTask(); Map> clusterTaskMap = tasks.stream() .filter(dto -> StringUtils.isNotBlank(dto.getSortClusterName()) && StringUtils.isNotBlank(dto.getSortTaskName()) @@ -189,7 +189,7 @@ private void reloadAllClusterConfig() { .collect(Collectors.groupingBy(SortTaskInfo::getSortClusterName)); // reload all streams - allStreams = sortConfigLoader.loadAllStreams() + allStreams = configLoader.loadAllStreams() .stream() .collect(Collectors.groupingBy(SortSourceStreamInfo::getInlongGroupId, Collectors.toMap(SortSourceStreamInfo::getInlongStreamId, info -> info))); @@ -202,7 +202,7 @@ private void reloadAllClusterConfig() { .collect(Collectors.groupingBy(StreamSinkEntity::getSortTaskName)); // get all data nodes and group by node name - List dataNodeEntities = sortConfigLoader.loadAllDataNodeEntity(); + List dataNodeEntities = configLoader.loadAllDataNodeEntity(); Map task2DataNodeMap = dataNodeEntities.stream() .filter(entity -> StringUtils.isNotBlank(entity.getName())) .map(entity -> { @@ -300,6 +300,6 @@ private Map parseSinkParams(DataNodeInfo nodeInfo) { */ private void setReloadTimer() { ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); - executorService.scheduleAtFixedRate(this::reload, reloadInterval, reloadInterval, TimeUnit.MILLISECONDS); + executorService.scheduleWithFixedDelay(this::reload, reloadInterval, reloadInterval, TimeUnit.MILLISECONDS); } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java index 2f87d3b92e2..d01911acb9a 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java @@ -42,8 +42,8 @@ import org.apache.inlong.manager.pojo.sort.SortStatusInfo; import org.apache.inlong.manager.pojo.sort.SortStatusRequest; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.manager.service.core.ConfigLoader; import org.apache.inlong.manager.service.core.SortClusterService; -import org.apache.inlong.manager.service.core.SortConfigLoader; import org.apache.inlong.manager.service.core.SortService; import org.apache.inlong.manager.service.core.SortSourceService; import org.apache.inlong.manager.service.group.InlongGroupService; @@ -99,7 +99,7 @@ public class SortServiceImpl implements SortService, PluginBinder { @Autowired private InlongStreamService streamService; @Autowired - private SortConfigLoader configLoader; + private ConfigLoader configLoader; @Autowired private DataNodeOperatorFactory dataNodeOperatorFactory; /** @@ -152,7 +152,7 @@ public void reload() { private void setReloadTimer() { ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); long reloadInterval = 60000L; - executorService.scheduleAtFixedRate(this::reload, reloadInterval, reloadInterval, TimeUnit.MILLISECONDS); + executorService.scheduleWithFixedDelay(this::reload, reloadInterval, reloadInterval, TimeUnit.MILLISECONDS); } @Override diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java index 247f2706857..9d6fe9ceb5d 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java @@ -33,7 +33,7 @@ import org.apache.inlong.manager.pojo.sort.standalone.SortSourceGroupInfo; import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamInfo; import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamSinkInfo; -import org.apache.inlong.manager.service.core.SortConfigLoader; +import org.apache.inlong.manager.service.core.ConfigLoader; import org.apache.inlong.manager.service.core.SortSourceService; import com.google.gson.Gson; @@ -111,7 +111,7 @@ public class SortSourceServiceImpl implements SortSourceService { private Map>> streamSinkMap; @Autowired - private SortConfigLoader configLoader; + private ConfigLoader configLoader; @PostConstruct public void initialize() { @@ -458,6 +458,6 @@ private CacheZone parsePulsarZone( private void setReloadTimer() { ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); long reloadInterval = 60000L; - executorService.scheduleAtFixedRate(this::reload, reloadInterval, reloadInterval, TimeUnit.MILLISECONDS); + executorService.scheduleWithFixedDelay(this::reload, reloadInterval, reloadInterval, TimeUnit.MILLISECONDS); } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java index 5069423c134..ba729202af4 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java @@ -22,6 +22,7 @@ import org.apache.inlong.manager.service.listener.queue.StreamQueueResourceListener; import org.apache.inlong.manager.service.listener.sink.StreamSinkResourceListener; import org.apache.inlong.manager.service.listener.sort.StreamSortConfigListener; +import org.apache.inlong.manager.service.listener.source.SourceStartListener; import org.apache.inlong.manager.workflow.WorkflowContext; import org.apache.inlong.manager.workflow.definition.ServiceTaskType; import org.apache.inlong.manager.workflow.definition.TaskListenerFactory; @@ -60,6 +61,8 @@ public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFact private StreamSortConfigListener streamSortConfigListener; @Autowired private StreamSinkResourceListener sinkResourceListener; + @Autowired + private SourceStartListener sourceStartListener; @PostConstruct public void init() { @@ -70,6 +73,7 @@ public void init() { sortOperateListeners.add(streamSortConfigListener); sinkOperateListeners = new LinkedList<>(); sinkOperateListeners.add(sinkResourceListener); + sourceOperateListeners.add(sourceStartListener); } @Override diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStartListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStartListener.java new file mode 100644 index 00000000000..2ee6f504ae3 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStartListener.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.listener.source; + +import org.apache.inlong.manager.common.consts.InlongConstants; +import org.apache.inlong.manager.common.enums.GroupOperateType; +import org.apache.inlong.manager.common.enums.TaskEvent; +import org.apache.inlong.manager.pojo.source.SourceRequest; +import org.apache.inlong.manager.pojo.source.StreamSource; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm; +import org.apache.inlong.manager.service.source.SourceOperatorFactory; +import org.apache.inlong.manager.service.source.StreamSourceOperator; +import org.apache.inlong.manager.service.source.StreamSourceService; +import org.apache.inlong.manager.workflow.WorkflowContext; +import org.apache.inlong.manager.workflow.event.ListenerResult; +import org.apache.inlong.manager.workflow.event.task.SourceOperateListener; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.List; + +@Slf4j +@Component +public class SourceStartListener implements SourceOperateListener { + + @Autowired + protected StreamSourceService streamSourceService; + @Autowired + private SourceOperatorFactory operatorFactory; + + @Override + public String name() { + return getClass().getSimpleName(); + } + + @Override + public TaskEvent event() { + return TaskEvent.COMPLETE; + } + + @Override + public boolean accept(WorkflowContext context) { + if (isGroupProcessForm(context)) { + return false; + } + StreamResourceProcessForm processForm = (StreamResourceProcessForm) context.getProcessForm(); + return InlongConstants.STANDARD_MODE.equals(processForm.getGroupInfo().getInlongGroupMode()) + && processForm.getGroupOperateType() == GroupOperateType.INIT; + } + + @Override + public ListenerResult listen(WorkflowContext context) throws Exception { + StreamResourceProcessForm form = (StreamResourceProcessForm) context.getProcessForm(); + String operator = context.getOperator(); + InlongStreamInfo streamInfo = form.getStreamInfo(); + final String groupId = streamInfo.getInlongGroupId(); + final String streamId = streamInfo.getInlongStreamId(); + log.info("begin to update agent task config for groupId={}, streamId={}", groupId, streamId); + List sources = streamSourceService.listSource(groupId, streamId); + for (StreamSource source : sources) { + SourceRequest request = source.genSourceRequest(); + StreamSourceOperator sourceOperator = operatorFactory.getInstance(request.getSourceType()); + sourceOperator.updateAgentTaskConfig(request, operator); + } + log.info("success to update agent task config for groupId={}, streamId={}", groupId, streamId); + return ListenerResult.success(); + } +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java index d4c951c2090..d4087d8976e 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java @@ -46,7 +46,7 @@ import org.apache.inlong.manager.pojo.dataproxy.InlongStreamId; import org.apache.inlong.manager.pojo.dataproxy.ProxyCluster; import org.apache.inlong.manager.pojo.sink.SinkPageRequest; -import org.apache.inlong.manager.service.core.SortConfigLoader; +import org.apache.inlong.manager.service.core.ConfigLoader; import com.google.common.base.Splitter; import com.google.common.collect.Sets; @@ -122,7 +122,7 @@ public class DataProxyConfigRepository implements IRepository { @Autowired private StreamSinkEntityMapper streamSinkMapper; @Autowired - private SortConfigLoader sortConfigLoader; + private ConfigLoader configLoader; @PostConstruct public void initialize() { @@ -364,7 +364,7 @@ private void reloadInlongId(Map proxyClusterMap) { Map> groupParams = new HashMap<>(); groupIdMap.forEach((k, v) -> groupParams.put(k, fromJsonToMap(v.getExtParams()))); // reload inlong group ext - List groupExtCursor = sortConfigLoader + List groupExtCursor = configLoader .loadGroupBackupInfo(ClusterSwitch.BACKUP_CLUSTER_TAG); groupExtCursor.forEach(v -> groupParams.computeIfAbsent(v.getInlongGroupId(), k -> new HashMap<>()) .put(ClusterSwitch.BACKUP_CLUSTER_TAG, v.getKeyValue())); @@ -390,7 +390,7 @@ private void reloadInlongId(Map proxyClusterMap) { streamParams.put(k, params); }); // reload inlong stream ext - List streamExtCursor = sortConfigLoader + List streamExtCursor = configLoader .loadStreamBackupInfo(ClusterSwitch.BACKUP_MQ_RESOURCE); streamExtCursor.forEach(v -> streamParams .computeIfAbsent(getInlongId(v.getInlongGroupId(), v.getInlongStreamId()), k -> new HashMap<>()) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java index 39d3845209c..2ecd79fb37e 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java @@ -17,28 +17,58 @@ package org.apache.inlong.manager.service.source; +import org.apache.inlong.common.constant.MQType; import org.apache.inlong.common.enums.DataTypeEnum; +import org.apache.inlong.common.enums.TaskStateEnum; +import org.apache.inlong.common.enums.TaskTypeEnum; +import org.apache.inlong.common.pojo.agent.AgentConfigInfo; +import org.apache.inlong.common.pojo.agent.AgentResponseCode; +import org.apache.inlong.common.pojo.agent.CmdConfig; +import org.apache.inlong.common.pojo.agent.DataConfig; +import org.apache.inlong.common.pojo.agent.TaskResult; +import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo; +import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo; import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.consts.SourceType; +import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.GroupStatus; import org.apache.inlong.manager.common.enums.SourceStatus; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.dao.entity.AgentTaskConfigEntity; +import org.apache.inlong.manager.dao.entity.InlongClusterEntity; +import org.apache.inlong.manager.dao.entity.InlongGroupEntity; +import org.apache.inlong.manager.dao.entity.InlongStreamEntity; import org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity; import org.apache.inlong.manager.dao.entity.StreamSourceEntity; import org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity; +import org.apache.inlong.manager.dao.mapper.AgentTaskConfigEntityMapper; +import org.apache.inlong.manager.dao.mapper.DataSourceCmdConfigEntityMapper; +import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper; +import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper; +import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper; import org.apache.inlong.manager.dao.mapper.InlongStreamFieldEntityMapper; import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper; import org.apache.inlong.manager.dao.mapper.StreamSourceFieldEntityMapper; +import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest; +import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO; import org.apache.inlong.manager.pojo.common.PageResult; +import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO; import org.apache.inlong.manager.pojo.source.DataAddTaskRequest; import org.apache.inlong.manager.pojo.source.SourceRequest; import org.apache.inlong.manager.pojo.source.StreamSource; +import org.apache.inlong.manager.pojo.source.file.FileSourceDTO; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.pojo.stream.StreamField; import org.apache.inlong.manager.service.node.DataNodeService; +import com.fasterxml.jackson.databind.ObjectMapper; import com.github.pagehelper.Page; +import com.google.common.collect.Lists; +import com.google.gson.Gson; +import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -48,8 +78,17 @@ import org.springframework.transaction.annotation.Transactional; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.inlong.manager.common.consts.InlongConstants.DOT; +import static org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.unpackExtParams; /** * Default operator of stream source. @@ -57,6 +96,8 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSourceOperator.class); + private static final Gson GSON = new Gson(); + private static final int MODULUS_100 = 100; @Autowired protected StreamSourceEntityMapper sourceMapper; @@ -66,6 +107,18 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator { protected InlongStreamFieldEntityMapper streamFieldMapper; @Autowired protected DataNodeService dataNodeService; + @Autowired + private AgentTaskConfigEntityMapper agentTaskConfigEntityMapper; + @Autowired + private InlongGroupEntityMapper groupMapper; + @Autowired + private InlongClusterEntityMapper clusterMapper; + @Autowired + private DataSourceCmdConfigEntityMapper sourceCmdConfigMapper; + @Autowired + private InlongStreamEntityMapper streamMapper; + @Autowired + private ObjectMapper objectMapper; /** * Getting the source type. @@ -109,6 +162,9 @@ public Integer saveOpt(SourceRequest request, Integer groupStatus, String operat if (request.getEnableSyncSchema()) { syncSourceFieldInfo(request, operator); } + if (GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) { + updateAgentTaskConfig(request, operator); + } return entity.getId(); } @@ -207,6 +263,9 @@ public void updateOpt(SourceRequest request, Integer groupStatus, Integer groupM } updateFieldOpt(entity, request.getFieldList()); LOGGER.debug("success to update source of type={}", request.getSourceType()); + if (GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) { + updateAgentTaskConfig(request, operator); + } } @Override @@ -232,6 +291,7 @@ public void stopOpt(SourceRequest request, String operator) { curEntity.getVersion()); throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED); } + updateAgentTaskConfig(request, operator); } @Override @@ -346,4 +406,219 @@ public void syncSourceFieldInfo(SourceRequest request, String operator) { public Integer addDataAddTask(DataAddTaskRequest request, String operator) { throw new BusinessException(String.format("not support data add task for type =%s", request.getSourceType())); } + + @Override + public void updateAgentTaskConfig(SourceRequest request, String operator) { + try { + if (SourceType.AUTO_PUSH.equals(request.getSourceType())) { + return; + } + final String clusterName = request.getInlongClusterName(); + final String ip = request.getAgentIp(); + final String uuid = request.getUuid(); + if (StringUtils.isBlank(clusterName) || StringUtils.isBlank(ip)) { + LOGGER.warn("skip update agent task config where cluster name or ip is null for request={}", request); + return; + } + AgentTaskConfigEntity existEntity = agentTaskConfigEntityMapper.selectByIdentifier(ip, clusterName); + AgentTaskConfigEntity agentTaskConfigEntity = new AgentTaskConfigEntity(); + if (existEntity != null) { + agentTaskConfigEntity = CommonBeanUtils.copyProperties(existEntity, AgentTaskConfigEntity::new, true); + } + List normalSourceEntities = sourceMapper.selectByStatusAndCluster( + SourceStatus.NORMAL_STATUS_SET.stream().map(SourceStatus::getCode) + .collect(Collectors.toList()), + clusterName, ip, uuid); + List taskLists = new ArrayList<>(normalSourceEntities); + List stopSourceEntities = sourceMapper.selectByStatusAndCluster( + SourceStatus.STOP_STATUS_SET.stream().map(SourceStatus::getCode) + .collect(Collectors.toList()), + clusterName, ip, uuid); + taskLists.addAll(stopSourceEntities); + LOGGER.debug("success to add task : {}", taskLists.size()); + List runningTaskConfig = Lists.newArrayList(); + List cmdConfigs = sourceCmdConfigMapper.queryCmdByAgentIp(request.getAgentIp()).stream() + .map(cmd -> { + CmdConfig cmdConfig = new CmdConfig(); + cmdConfig.setDataTime(cmd.getSpecifiedDataTime()); + cmdConfig.setOp(cmd.getCmdType()); + cmdConfig.setId(cmd.getId()); + cmdConfig.setTaskId(cmd.getTaskId()); + return cmdConfig; + }).collect(Collectors.toList()); + if (CollectionUtils.isEmpty(taskLists)) { + agentTaskConfigEntity.setIsDeleted(agentTaskConfigEntity.getId()); + agentTaskConfigEntityMapper.updateByIdSelective(agentTaskConfigEntity); + return; + } + for (StreamSourceEntity sourceEntity : taskLists) { + int op = sourceEntity.getStatus() % MODULUS_100; + DataConfig dataConfig = getDataConfig(sourceEntity, op); + runningTaskConfig.add(dataConfig); + } + TaskResult taskResult = + TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build(); + String md5 = DigestUtils.md5Hex(GSON.toJson(taskResult)); + taskResult.setMd5(md5); + taskResult.setCode(AgentResponseCode.SUCCESS); + agentTaskConfigEntity.setAgentIp(request.getAgentIp()); + agentTaskConfigEntity.setClusterName(request.getInlongClusterName()); + agentTaskConfigEntity.setTaskParams(objectMapper.writeValueAsString(taskResult)); + + LOGGER.debug("begin to get agent config info for {}", request); + Set tagSet = new HashSet<>(16); + InlongGroupEntity groupEntity = + groupMapper.selectByGroupIdWithoutTenant(request.getInlongGroupId()); + String clusterTag = groupEntity.getInlongClusterTag(); + InlongClusterEntity agentClusterInfo = clusterMapper.selectByNameAndType(request.getInlongClusterName(), + ClusterType.AGENT); + AgentConfigInfo agentConfigInfo = AgentConfigInfo.builder() + .cluster(AgentConfigInfo.AgentClusterInfo.builder() + .parentId(agentClusterInfo.getId()) + .clusterName(agentClusterInfo.getName()) + .build()) + .build(); + if (StringUtils.isNotBlank(clusterTag)) { + tagSet.addAll(Arrays.asList(clusterTag.split(InlongConstants.COMMA))); + List clusterTagList = new ArrayList<>(tagSet); + ClusterPageRequest pageRequest = ClusterPageRequest.builder() + .type(ClusterType.AGENT_ZK) + .clusterTagList(clusterTagList) + .build(); + List agentZkCluster = clusterMapper.selectByCondition(pageRequest); + if (CollectionUtils.isNotEmpty(agentZkCluster)) { + agentConfigInfo.setZkUrl(agentZkCluster.get(0).getUrl()); + } + } + + String jsonStr = GSON.toJson(agentConfigInfo); + String configMd5 = DigestUtils.md5Hex(jsonStr); + agentConfigInfo.setMd5(configMd5); + agentConfigInfo.setCode(AgentResponseCode.SUCCESS); + agentTaskConfigEntity.setConfigParams(objectMapper.writeValueAsString(agentConfigInfo)); + agentClusterInfo.setModifier(operator); + if (existEntity == null) { + agentTaskConfigEntity.setCreator(operator); + agentTaskConfigEntityMapper.insert(agentTaskConfigEntity); + } else { + agentTaskConfigEntityMapper.updateByIdSelective(agentTaskConfigEntity); + } + LOGGER.debug("success to update agent config info for: {}, result: {}", request, agentConfigInfo); + } catch (Exception e) { + String errMsg = String.format("update agent task config failed for groupId=%s, streamId=%s, ip=%s", + request.getInlongGroupId(), request.getInlongStreamId(), request.getAgentIp()); + LOGGER.error(errMsg, e); + throw new BusinessException(errMsg); + } + } + + private DataConfig getDataConfig(StreamSourceEntity entity, int op) { + DataConfig dataConfig = new DataConfig(); + dataConfig.setIp(entity.getAgentIp()); + dataConfig.setUuid(entity.getUuid()); + dataConfig.setOp(String.valueOf(op)); + dataConfig.setTaskId(entity.getId()); + dataConfig.setTaskType(getTaskType(entity)); + dataConfig.setTaskName(entity.getSourceName()); + dataConfig.setSnapshot(entity.getSnapshot()); + dataConfig.setTimeZone(entity.getDataTimeZone()); + dataConfig.setVersion(entity.getVersion()); + + String groupId = entity.getInlongGroupId(); + String streamId = entity.getInlongStreamId(); + dataConfig.setInlongGroupId(groupId); + dataConfig.setInlongStreamId(streamId); + + InlongGroupEntity groupEntity = groupMapper.selectByGroupIdWithoutTenant(groupId); + InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId); + String extParams = getExtParams(entity); + if (groupEntity != null && streamEntity != null) { + dataConfig.setState( + SourceStatus.NORMAL_STATUS_SET.contains(SourceStatus.forCode(entity.getStatus())) + ? TaskStateEnum.RUNNING.getType() + : TaskStateEnum.FROZEN.getType()); + dataConfig.setSyncSend(streamEntity.getSyncSend()); + if (SourceType.FILE.equalsIgnoreCase(entity.getSourceType())) { + String dataSeparator = String.valueOf((char) Integer.parseInt(streamEntity.getDataSeparator())); + FileSourceDTO fileSourceDTO = JsonUtils.parseObject(extParams, FileSourceDTO.class); + if (Objects.nonNull(fileSourceDTO)) { + fileSourceDTO.setDataSeparator(dataSeparator); + dataConfig.setAuditVersion(fileSourceDTO.getAuditVersion()); + fileSourceDTO.setDataContentStyle(streamEntity.getDataType()); + extParams = JsonUtils.toJsonString(fileSourceDTO); + } + } + InlongStreamInfo streamInfo = CommonBeanUtils.copyProperties(streamEntity, InlongStreamInfo::new); + // Processing extParams + unpackExtParams(streamEntity.getExtParams(), streamInfo); + dataConfig.setPredefinedFields(streamInfo.getPredefinedFields()); + + int dataReportType = groupEntity.getDataReportType(); + dataConfig.setDataReportType(dataReportType); + if (InlongConstants.REPORT_TO_MQ_RECEIVED == dataReportType) { + // add mq cluster setting + List mqSet = new ArrayList<>(); + List clusterTagList = Collections.singletonList(groupEntity.getInlongClusterTag()); + ClusterPageRequest pageRequest = ClusterPageRequest.builder() + .type(groupEntity.getMqType()) + .clusterTagList(clusterTagList) + .build(); + List mqClusterList = clusterMapper.selectByCondition(pageRequest); + for (InlongClusterEntity cluster : mqClusterList) { + MQClusterInfo clusterInfo = new MQClusterInfo(); + clusterInfo.setUrl(cluster.getUrl()); + clusterInfo.setToken(cluster.getToken()); + clusterInfo.setMqType(cluster.getType()); + clusterInfo.setParams(JsonUtils.parseObject(cluster.getExtParams(), HashMap.class)); + mqSet.add(clusterInfo); + } + dataConfig.setMqClusters(mqSet); + + // add topic setting + String mqResource = groupEntity.getMqResource(); + String mqType = groupEntity.getMqType(); + if (MQType.PULSAR.equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType)) { + // first get the tenant from the InlongGroup, and then get it from the PulsarCluster. + InlongPulsarDTO pulsarDTO = InlongPulsarDTO.getFromJson(groupEntity.getExtParams()); + String tenant = pulsarDTO.getPulsarTenant(); + if (StringUtils.isBlank(tenant)) { + // If there are multiple Pulsar clusters, take the first one. + // Note that the tenants in multiple Pulsar clusters must be identical. + PulsarClusterDTO pulsarCluster = PulsarClusterDTO.getFromJson( + mqClusterList.get(0).getExtParams()); + tenant = pulsarCluster.getPulsarTenant(); + } + + String topic = String.format(InlongConstants.PULSAR_TOPIC_FORMAT, + tenant, mqResource, streamEntity.getMqResource()); + DataProxyTopicInfo topicConfig = new DataProxyTopicInfo(); + topicConfig.setInlongGroupId(groupId + "/" + streamId); + topicConfig.setTopic(topic); + dataConfig.setTopicInfo(topicConfig); + } else if (MQType.TUBEMQ.equals(mqType)) { + DataProxyTopicInfo topicConfig = new DataProxyTopicInfo(); + topicConfig.setInlongGroupId(groupId); + topicConfig.setTopic(mqResource); + dataConfig.setTopicInfo(topicConfig); + } else if (MQType.KAFKA.equals(mqType)) { + DataProxyTopicInfo topicConfig = new DataProxyTopicInfo(); + topicConfig.setInlongGroupId(groupId); + topicConfig.setTopic(groupEntity.getMqResource() + DOT + streamEntity.getMqResource()); + dataConfig.setTopicInfo(topicConfig); + } + } else { + LOGGER.warn("set syncSend=[0] as the stream not exists for groupId={}, streamId={}", groupId, streamId); + } + } + dataConfig.setExtParams(extParams); + return dataConfig; + } + + private int getTaskType(StreamSourceEntity sourceEntity) { + TaskTypeEnum taskType = SourceType.SOURCE_TASK_MAP.get(sourceEntity.getSourceType()); + if (taskType == null) { + throw new BusinessException("Unsupported task type for source type " + sourceEntity.getSourceType()); + } + return taskType.getType(); + } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java index 997f39b867e..e820140ae67 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java @@ -136,4 +136,12 @@ default Map> getSourcesMap(InlongGroupInfo groupInfo, */ Integer addDataAddTask(DataAddTaskRequest request, String operator); + /** + * Update the agent task config info. + * + * @param request source request + * @param operator name of the operator + */ + void updateAgentTaskConfig(SourceRequest request, String operator); + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/TemplateServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/TemplateServiceImpl.java index a76328ee4dd..37953f12760 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/TemplateServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/TemplateServiceImpl.java @@ -116,7 +116,7 @@ public TemplateInfo get(String templateName, String operator) { TemplateEntity templateEntity = templateEntityMapper.selectByName(templateName); if (templateEntity == null) { LOGGER.error("inlong template not found by template name={}", templateName); - throw new BusinessException(ErrorCodeEnum.TEMPLATE_INFO_INCORRECT); + throw new BusinessException(ErrorCodeEnum.TEMPLATE_NOT_FOUND); } TemplateInfo templateInfo = CommonBeanUtils.copyProperties(templateEntity, TemplateInfo::new); @@ -182,7 +182,7 @@ public Boolean update(TemplateRequest request, String operator) { TemplateEntity templateEntity = templateEntityMapper.selectByName(templateName); if (templateEntity == null) { LOGGER.error("inlong template not found by template name={}", templateName); - throw new BusinessException(ErrorCodeEnum.TEMPLATE_INFO_INCORRECT); + throw new BusinessException(ErrorCodeEnum.TEMPLATE_NOT_FOUND); } if (templateEntity.getInCharges().contains(operator)) { diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql index 8a0bcbf9c92..531a639d4a9 100644 --- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql +++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql @@ -901,6 +901,26 @@ CREATE TABLE IF NOT EXISTS `cluster_config` UNIQUE KEY `unique_clustert_config_sink_id` (`cluster_tag`, `is_deleted`) ); +-- ---------------------------- +-- Table structure for agent_task_config +-- ---------------------------- +CREATE TABLE IF NOT EXISTS `agent_task_config` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `config_params` text DEFAULT NULL COMMENT 'The agent config params', + `task_params` text NOT NULL COMMENT 'The agent task config params', + `agent_ip` varchar(128) NOT NULL COMMENT 'agent ip', + `cluster_name` varchar(128) NOT NULL COMMENT 'Inlong cluster name', + `creator` varchar(128) DEFAULT NULL COMMENT 'Creator', + `modifier` varchar(128) DEFAULT NULL COMMENT 'Modifier name', + `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time', + `modify_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time', + `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0 is not deleted, if greater than 0, delete', + `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification', + PRIMARY KEY (`id`), + UNIQUE KEY `unique_agent_task_config_ip_cluster_name` (`agent_ip`, `cluster_name`, `is_deleted`) +); + -- ---------------------------- -- Table structure for template -- ---------------------------- diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql index dfb6420dc9e..430982df285 100644 --- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql +++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql @@ -952,6 +952,27 @@ CREATE TABLE IF NOT EXISTS `cluster_config` ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COMMENT = 'cluster_config'; +-- ---------------------------- +-- Table structure for agent_task_config +-- ---------------------------- +CREATE TABLE IF NOT EXISTS `agent_task_config` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `config_params` text DEFAULT NULL COMMENT 'The agent config params', + `task_params` text NOT NULL COMMENT 'The agent task config params', + `agent_ip` varchar(128) NOT NULL COMMENT 'agent ip', + `cluster_name` varchar(128) NOT NULL COMMENT 'Inlong cluster name', + `creator` varchar(128) DEFAULT NULL COMMENT 'Creator', + `modifier` varchar(128) DEFAULT NULL COMMENT 'Modifier name', + `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time', + `modify_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time', + `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0 is not deleted, if greater than 0, delete', + `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification', + PRIMARY KEY (`id`), + UNIQUE KEY `unique_agent_task_config_ip_cluster_name` (`agent_ip`, `cluster_name`, `is_deleted`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 COMMENT = 'agent_task_config'; + -- ---------------------------- -- Table structure for template -- ---------------------------- diff --git a/inlong-manager/manager-web/sql/changes-1.13.0.sql b/inlong-manager/manager-web/sql/changes-1.13.0.sql index 5000782285f..166c98f1998 100644 --- a/inlong-manager/manager-web/sql/changes-1.13.0.sql +++ b/inlong-manager/manager-web/sql/changes-1.13.0.sql @@ -118,3 +118,24 @@ CREATE TABLE IF NOT EXISTS `schedule_config` ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COMMENT = 'schedule_config'; -- ---------------------------- + +-- ---------------------------- +-- Table structure for agent_task_config +-- ---------------------------- +CREATE TABLE IF NOT EXISTS `agent_task_config` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `config_params` text DEFAULT NULL COMMENT 'The agent config params', + `task_params` text NOT NULL COMMENT 'The agent task config params', + `agent_ip` varchar(128) NOT NULL COMMENT 'agent ip', + `cluster_name` varchar(128) NOT NULL COMMENT 'Inlong cluster name', + `creator` varchar(128) DEFAULT NULL COMMENT 'Creator', + `modifier` varchar(128) DEFAULT NULL COMMENT 'Modifier name', + `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time', + `modify_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time', + `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0 is not deleted, if greater than 0, delete', + `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification', + PRIMARY KEY (`id`), + UNIQUE KEY `unique_agent_task_config_ip_cluster_name` (`agent_ip`, `cluster_name`, `is_deleted`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 COMMENT = 'agent_task_config';