Skip to content

Commit

Permalink
[INLONG-10564][Agent] Request configuration with md5 included
Browse files Browse the repository at this point in the history
  • Loading branch information
justinwwhuang committed Jul 4, 2024
1 parent f112ec0 commit 27dbc83
Showing 1 changed file with 9 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.inlong.common.enums.PullJobTypeEnum;
import org.apache.inlong.common.pojo.agent.AgentConfigInfo;
import org.apache.inlong.common.pojo.agent.AgentConfigRequest;
import org.apache.inlong.common.pojo.agent.AgentResponseCode;
import org.apache.inlong.common.pojo.agent.DataConfig;
import org.apache.inlong.common.pojo.agent.TaskRequest;
import org.apache.inlong.common.pojo.agent.TaskResult;
Expand Down Expand Up @@ -83,6 +84,8 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
private String uuid;
private String clusterTag;
private String clusterName;
private String taskResultMd5;
private String agentConfigMd5;

public ManagerFetcher(AgentManager agentManager) {
this.agentManager = agentManager;
Expand Down Expand Up @@ -162,6 +165,7 @@ public AgentConfigInfo getAgentConfigInfo() {

public TaskRequest getTaskRequest() {
TaskRequest request = new TaskRequest();
request.setMd5(taskResultMd5);
request.setAgentIp(localIp);
request.setUuid(uuid);
request.setClusterName(clusterName);
Expand All @@ -172,6 +176,7 @@ public TaskRequest getTaskRequest() {

public AgentConfigRequest getAgentConfigInfoRequest() {
AgentConfigRequest request = new AgentConfigRequest();
request.setMd5(agentConfigMd5);
request.setClusterTag(clusterTag);
request.setClusterName(clusterName);
request.setIp(localIp);
Expand All @@ -189,17 +194,19 @@ private Runnable configFetchThread() {
while (isRunnable()) {
try {
TaskResult taskResult = getStaticConfig();
if (taskResult != null) {
if (taskResult != null && taskResult.getCode().equals(AgentResponseCode.SUCCESS)) {
List<TaskProfile> taskProfiles = new ArrayList<>();
taskResult.getDataConfigs().forEach((config) -> {
TaskProfile profile = TaskProfile.convertToTaskProfile(config);
taskProfiles.add(profile);
});
agentManager.getTaskManager().submitTaskProfiles(taskProfiles);
taskResultMd5 = taskResult.getMd5();
}
AgentConfigInfo config = getAgentConfigInfo();
if (config != null) {
if (config != null && config.getCode().equals(AgentResponseCode.SUCCESS)) {
agentManager.subNewAgentConfigInfo(config);
agentConfigMd5 = config.getMd5();
}
} catch (Throwable ex) {
LOGGER.warn("exception caught", ex);
Expand Down

0 comments on commit 27dbc83

Please sign in to comment.