Skip to content

Commit

Permalink
[INLONG-10558][Manager] Support determining whether to issue agent ta…
Browse files Browse the repository at this point in the history
…sks based on the MD5 value (apache#10559)

* [INLONG-10558][Manager] Support determining whether to issue agent tasks based on the MD5 value
  • Loading branch information
fuweng11 authored Jul 4, 2024
1 parent 88595f8 commit f112ec0
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.HttpManager;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.pojo.agent.AgentResponseCode;
import org.apache.inlong.common.pojo.agent.installer.ConfigRequest;
import org.apache.inlong.common.pojo.agent.installer.ConfigResult;
import org.apache.inlong.common.pojo.agent.installer.InstallerCode;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
Expand Down Expand Up @@ -124,7 +124,7 @@ private Runnable configFetchThread() {
while (isRunnable()) {
try {
ConfigResult config = getConfig();
if (config != null && config.getCode().equals(InstallerCode.SUCCESS)) {
if (config != null && config.getCode().equals(AgentResponseCode.SUCCESS)) {
manager.getModuleManager().submitConfig(config);
}
} catch (Throwable ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@
* The Agent config info.
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AgentConfigInfo {

AgentResponseCode code;
private String zkUrl;

private AgentClusterInfo cluster;
private String md5;

@Data
@Builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,7 @@ public class AgentConfigRequest {
private String clusterName;

private String ip;

private String md5;

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
* limitations under the License.
*/

package org.apache.inlong.common.pojo.agent.installer;
package org.apache.inlong.common.pojo.agent;

public enum InstallerCode {
public enum AgentResponseCode {

SUCCESS(0, "SUCCESS", "Get module config success"),
NO_UPDATE(1, "NO_UPDATE", "No update"),
Expand All @@ -27,16 +27,16 @@ public enum InstallerCode {
private final String name;
private final String desc;

InstallerCode(int id, String name, String desc) {
AgentResponseCode(int id, String name, String desc) {
this.id = id;
this.name = name;
this.desc = desc;
}

public static InstallerCode valueOf(int value) {
for (InstallerCode installerCode : InstallerCode.values()) {
if (installerCode.getId() == value) {
return installerCode;
public static AgentResponseCode valueOf(int value) {
for (AgentResponseCode agentResponseCode : AgentResponseCode.values()) {
if (agentResponseCode.getId() == value) {
return agentResponseCode;
}
}
return UNKNOWN_ERROR;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class TaskRequest {

private int pullJobType;

private String md5;

private List<CommandEntity> commandInfo = new ArrayList<>();

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,7 @@ public class TaskResult {

private List<CmdConfig> cmdConfigs;
private List<DataConfig> dataConfigs;
private String md5;
AgentResponseCode code;

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.common.pojo.agent.installer;

import org.apache.inlong.common.pojo.agent.AgentResponseCode;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
Expand All @@ -36,7 +38,7 @@ public class ConfigResult {
/**
* The code of the config result
*/
InstallerCode code;
AgentResponseCode code;

/**
* The md5 of the config result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@
import org.apache.inlong.common.enums.TaskTypeEnum;
import org.apache.inlong.common.pojo.agent.AgentConfigInfo;
import org.apache.inlong.common.pojo.agent.AgentConfigRequest;
import org.apache.inlong.common.pojo.agent.AgentResponseCode;
import org.apache.inlong.common.pojo.agent.CmdConfig;
import org.apache.inlong.common.pojo.agent.DataConfig;
import org.apache.inlong.common.pojo.agent.TaskRequest;
import org.apache.inlong.common.pojo.agent.TaskResult;
import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
import org.apache.inlong.common.pojo.agent.installer.ConfigRequest;
import org.apache.inlong.common.pojo.agent.installer.ConfigResult;
import org.apache.inlong.common.pojo.agent.installer.InstallerCode;
import org.apache.inlong.common.pojo.agent.installer.ModuleConfig;
import org.apache.inlong.common.pojo.agent.installer.PackageConfig;
import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
Expand Down Expand Up @@ -145,7 +145,10 @@ public class AgentServiceImpl implements AgentService {
new CallerRunsPolicy());

@Getter
private LoadingCache<TaskRequest, List<StreamSourceEntity>> taskCache;
private LoadingCache<TaskRequest, TaskResult> taskCache;
@Getter
private LoadingCache<AgentConfigRequest, AgentConfigInfo> agentConfigCache;

@Getter
private LoadingCache<ConfigRequest, ConfigResult> moduleConfigCache;

Expand Down Expand Up @@ -201,6 +204,9 @@ private void startHeartbeatTask() {
taskCache = Caffeine.newBuilder()
.expireAfterWrite(expireTime * 2L, TimeUnit.SECONDS)
.build(this::fetchTask);
agentConfigCache = Caffeine.newBuilder()
.expireAfterWrite(expireTime * 2L, TimeUnit.SECONDS)
.build(this::fetchAgentConfig);
LOGGER.debug("start to reload config for installer.");
try {
moduleConfigCache = Caffeine.newBuilder()
Expand Down Expand Up @@ -331,28 +337,18 @@ private void updateTaskStatus(CommandEntity command) {
@Override
public AgentConfigInfo getAgentConfig(AgentConfigRequest request) {
LOGGER.debug("begin to get agent config info for {}", request);
AgentConfigInfo agentConfigInfo = new AgentConfigInfo();
Set<String> tagSet = new HashSet<>(16);
tagSet.addAll(Arrays.asList(request.getClusterTag().split(InlongConstants.COMMA)));
List<String> clusterTagList = new ArrayList<>(tagSet);
ClusterPageRequest pageRequest = ClusterPageRequest.builder()
.type(ClusterType.AGENT_ZK)
.clusterTagList(clusterTagList)
.build();
List<InlongClusterEntity> agentZkCluster = clusterMapper.selectByCondition(pageRequest);
if (CollectionUtils.isNotEmpty(agentZkCluster)) {
agentConfigInfo.setZkUrl(agentZkCluster.get(0).getUrl());
AgentConfigInfo agentConfigInfo = agentConfigCache.get(request);
if (agentConfigInfo == null) {
return null;
}
if (request.getMd5() == null || !Objects.equals(request.getMd5(), agentConfigInfo.getMd5())) {
return agentConfigInfo;
}

AgentClusterInfo clusterInfo = (AgentClusterInfo) clusterService.getOne(
null, request.getClusterName(), ClusterType.AGENT);
agentConfigInfo.setCluster(AgentConfigInfo.AgentClusterInfo.builder()
.parentId(clusterInfo.getId())
.clusterName(clusterInfo.getName())
.build());

LOGGER.debug("success to get agent config info for: {}, result: {}", request, agentConfigInfo);
return agentConfigInfo;
return AgentConfigInfo.builder()
.md5(agentConfigInfo.getMd5())
.code(AgentResponseCode.NO_UPDATE)
.build();
}

@Override
Expand All @@ -374,27 +370,19 @@ public TaskResult getTaskResult(TaskRequest request) {
@Override
public TaskResult getExistTaskConfig(TaskRequest request) {
LOGGER.debug("begin to get all exist task by request={}", request);
// Query pending special commands
List<DataConfig> runningTaskConfig = Lists.newArrayList();
List<StreamSourceEntity> sourceEntities = taskCache.get(request);
try {
List<CmdConfig> cmdConfigs = getAgentCmdConfigs(request);
if (CollectionUtils.isEmpty(sourceEntities)) {
return TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build();
}
for (StreamSourceEntity sourceEntity : sourceEntities) {
int op = getOp(sourceEntity.getStatus());
DataConfig dataConfig = getDataConfig(sourceEntity, op);
runningTaskConfig.add(dataConfig);
}
TaskResult taskResult = TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build();

TaskResult taskResult = taskCache.get(request);
if (taskResult == null) {
return null;
}
if (request.getMd5() == null || !Objects.equals(request.getMd5(), taskResult.getMd5())) {
return taskResult;
} catch (Exception e) {
LOGGER.error("get all exist task failed:", e);
throw new BusinessException("get all exist task failed:" + e.getMessage());
}

return TaskResult.builder()
.dataConfigs(new ArrayList<>())
.cmdConfigs(new ArrayList<>())
.md5(taskResult.getMd5())
.code(AgentResponseCode.NO_UPDATE)
.build();
}

@Override
Expand Down Expand Up @@ -474,7 +462,7 @@ public ConfigResult getConfig(ConfigRequest request) {
if (Objects.equals(request.getMd5(), configResult.getMd5())) {
return ConfigResult.builder()
.md5(configResult.getMd5())
.code(InstallerCode.NO_UPDATE)
.code(AgentResponseCode.NO_UPDATE)
.build();
}
return configResult;
Expand Down Expand Up @@ -837,7 +825,7 @@ private boolean matchGroup(StreamSourceEntity sourceEntity, InlongClusterNodeEnt
return sourceGroups.stream().anyMatch(clusterNodeGroups::contains);
}

private List<StreamSourceEntity> fetchTask(TaskRequest request) {
private TaskResult fetchTask(TaskRequest request) {
final String clusterName = request.getClusterName();
final String ip = request.getAgentIp();
final String uuid = request.getUuid();
Expand All @@ -850,7 +838,55 @@ private List<StreamSourceEntity> fetchTask(TaskRequest request) {
clusterName, ip, uuid);
taskLists.addAll(stopSourceEntities);
LOGGER.debug("success to add task : {}", taskLists.size());
return taskLists;
List<DataConfig> runningTaskConfig = Lists.newArrayList();
try {
List<CmdConfig> cmdConfigs = getAgentCmdConfigs(request);
if (CollectionUtils.isEmpty(taskLists)) {
return TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build();
}
for (StreamSourceEntity sourceEntity : taskLists) {
int op = getOp(sourceEntity.getStatus());
DataConfig dataConfig = getDataConfig(sourceEntity, op);
runningTaskConfig.add(dataConfig);
}
TaskResult taskResult = TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build();
String md5 = DigestUtils.md5Hex(GSON.toJson(taskResult));
taskResult.setMd5(md5);
taskResult.setCode(AgentResponseCode.SUCCESS);
return taskResult;
} catch (Exception e) {
LOGGER.error("get all exist task failed:", e);
throw new BusinessException("get all exist task failed:" + e.getMessage());
}
}

private AgentConfigInfo fetchAgentConfig(AgentConfigRequest request) {
LOGGER.debug("begin to get agent config info for {}", request);
AgentConfigInfo agentConfigInfo = new AgentConfigInfo();
Set<String> tagSet = new HashSet<>(16);
tagSet.addAll(Arrays.asList(request.getClusterTag().split(InlongConstants.COMMA)));
List<String> clusterTagList = new ArrayList<>(tagSet);
ClusterPageRequest pageRequest = ClusterPageRequest.builder()
.type(ClusterType.AGENT_ZK)
.clusterTagList(clusterTagList)
.build();
List<InlongClusterEntity> agentZkCluster = clusterMapper.selectByCondition(pageRequest);
if (CollectionUtils.isNotEmpty(agentZkCluster)) {
agentConfigInfo.setZkUrl(agentZkCluster.get(0).getUrl());
}

AgentClusterInfo clusterInfo = (AgentClusterInfo) clusterService.getOne(
null, request.getClusterName(), ClusterType.AGENT);
agentConfigInfo.setCluster(AgentConfigInfo.AgentClusterInfo.builder()
.parentId(clusterInfo.getId())
.clusterName(clusterInfo.getName())
.build());
String jsonStr = GSON.toJson(agentConfigInfo);
String configMd5 = DigestUtils.md5Hex(jsonStr);
agentConfigInfo.setMd5(configMd5);
agentConfigInfo.setCode(AgentResponseCode.SUCCESS);
LOGGER.debug("success to get agent config info for: {}, result: {}", request, agentConfigInfo);
return agentConfigInfo;
}

private ConfigResult loadModuleConfigs(ConfigRequest request) {
Expand All @@ -870,7 +906,7 @@ private ConfigResult loadModuleConfigs(ConfigRequest request) {
String configMd5 = DigestUtils.md5Hex(jsonStr);

ConfigResult configResult = ConfigResult.builder().moduleList(configs).md5(configMd5)
.code(InstallerCode.SUCCESS)
.code(AgentResponseCode.SUCCESS)
.build();
LOGGER.info("success load module config, size = {}", configResult.getModuleList().size());
return configResult;
Expand Down

0 comments on commit f112ec0

Please sign in to comment.