diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java index 92ac0529f4f..a2fff7885cc 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java @@ -51,7 +51,7 @@ public class JobConstants extends CommonConstants { public static final String JOB_MQ_TOPIC = "job.topicInfo"; // File job - public static final String JOB_FILE_JOB_TRIGGER = "job.fileJob.trigger"; + public static final String JOB_FILE_TRIGGER = "job.fileJob.trigger"; public static final String JOB_DIR_FILTER_PATTERN = "job.fileJob.dir.pattern"; // deprecated public static final String JOB_DIR_FILTER_PATTERNS = "job.fileJob.dir.patterns"; public static final String JOB_DIR_FILTER_BLACKLIST = "job.fileJob.dir.blackList"; diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java index b62413b0323..5b58315b435 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java @@ -35,7 +35,7 @@ import java.io.Closeable; -import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_JOB_TRIGGER; +import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_TRIGGER; import static org.apache.inlong.agent.constant.JobConstants.JOB_SOURCE_TYPE; /** @@ -87,7 +87,7 @@ public void storeJobConf(JobProfile jobProfile) { // store job conf to bdb if (jobProfile != null) { // trigger job is a special kind of job - if (jobProfile.hasKey(JOB_FILE_JOB_TRIGGER)) { + if (jobProfile.hasKey(JOB_FILE_TRIGGER)) { triggerManager.submitTrigger( TriggerProfile.parseJsonStr(jobProfile.toJsonStr()), true); } else { @@ -123,7 +123,7 @@ public void storeAgentConf(String confJsonStr) { */ public void deleteJobConf(JobProfile jobProfile) { if (jobProfile != null) { - if (jobProfile.hasKey(JOB_FILE_JOB_TRIGGER)) { + if (jobProfile.hasKey(JOB_FILE_TRIGGER)) { triggerManager.deleteTrigger(TriggerProfile.parseJobProfile(jobProfile).getTriggerId(), false); } else { jobManager.deleteJob(jobProfile.getInstanceId(), false); diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java index c1e933d8aea..5cfca599abb 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java @@ -17,6 +17,27 @@ package org.apache.inlong.agent.core.job; +import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_DB_CACHE_CHECK_INTERVAL; +import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_DB_CACHE_TIME; +import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_NUMBER_LIMIT; +import static org.apache.inlong.agent.constant.AgentConstants.JOB_DB_CACHE_CHECK_INTERVAL; +import static org.apache.inlong.agent.constant.AgentConstants.JOB_DB_CACHE_TIME; +import static org.apache.inlong.agent.constant.AgentConstants.JOB_NUMBER_LIMIT; +import static org.apache.inlong.agent.constant.AgentConstants.JOB_VERSION; +import static org.apache.inlong.agent.constant.JobConstants.JOB_ID; +import static org.apache.inlong.agent.constant.JobConstants.JOB_ID_PREFIX; +import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID; +import static org.apache.inlong.agent.constant.JobConstants.SQL_JOB_ID; +import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_COMPONENT_NAME; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.inlong.agent.common.AbstractDaemon; import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.AgentConfiguration; @@ -31,32 +52,9 @@ import org.apache.inlong.agent.utils.GsonUtil; import org.apache.inlong.agent.utils.ThreadUtils; import org.apache.inlong.common.metric.MetricRegister; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_DB_CACHE_CHECK_INTERVAL; -import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_DB_CACHE_TIME; -import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_NUMBER_LIMIT; -import static org.apache.inlong.agent.constant.AgentConstants.JOB_DB_CACHE_CHECK_INTERVAL; -import static org.apache.inlong.agent.constant.AgentConstants.JOB_DB_CACHE_TIME; -import static org.apache.inlong.agent.constant.AgentConstants.JOB_NUMBER_LIMIT; -import static org.apache.inlong.agent.constant.AgentConstants.JOB_VERSION; -import static org.apache.inlong.agent.constant.JobConstants.JOB_ID; -import static org.apache.inlong.agent.constant.JobConstants.JOB_ID_PREFIX; -import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID; -import static org.apache.inlong.agent.constant.JobConstants.SQL_JOB_ID; -import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_COMPONENT_NAME; - /** * JobManager maintains lots of jobs, and communicate between server and task manager. */ @@ -108,45 +106,44 @@ public JobManager(AgentManager agentManager, JobProfileDb jobProfileDb) { this.dimensions = new HashMap<>(); this.dimensions.put(KEY_COMPONENT_NAME, this.getClass().getSimpleName()); this.jobMetrics = new AgentMetricItemSet(this.getClass().getSimpleName()); - MetricRegister.unregister(jobMetrics); MetricRegister.register(jobMetrics); } /** - * submit job to work thread. + * add file job profile * - * @param job job + * @param profile job profile. */ - private void addJob(Job job) { - if (pendingJobs.containsKey(job.getJobInstanceId())) { - return; - } - try { - JobWrapper jobWrapper = new JobWrapper(agentManager, job); - JobWrapper jobWrapperRet = jobs.putIfAbsent(jobWrapper.getJob().getJobInstanceId(), jobWrapper); - if (jobWrapperRet != null) { - LOGGER.warn("{} has been added to running pool, " - + "cannot be added repeatedly", job.getJobInstanceId()); - return; - } else { - getJobMetric().jobRunningCount.incrementAndGet(); - } - this.runningPool.execute(jobWrapper); - } catch (Exception rje) { - LOGGER.debug("reject job {}", job.getJobInstanceId(), rje); - pendingJobs.putIfAbsent(job.getJobInstanceId(), job); - } catch (Throwable t) { - ThreadUtils.threadThrowableHandler(Thread.currentThread(), t); - } + public boolean submitFileJobProfile(JobProfile profile) { + return submitJobProfile(profile, false, true); } /** - * add file job profile + * make up file job * * @param profile job profile. */ - public boolean submitFileJobProfile(JobProfile profile) { - return submitJobProfile(profile, false, true); + public void makeUpJob(JobProfile profile, boolean singleJob) { + LOGGER.error("need to make up job {}", profile); + if (!isJobValid(profile)) { + LOGGER.error("make up job failed, invalid profile {}", profile); + return; + } + String jobId = profile.get(JOB_ID); + if (singleJob) { + profile.set(JOB_INSTANCE_ID, AgentUtils.getSingleJobId(JOB_ID_PREFIX, jobId)); + } else { + profile.set(JOB_INSTANCE_ID, AgentUtils.getUniqId(JOB_ID_PREFIX, jobId, index.incrementAndGet())); + } + JobProfile jobFromDb = jobProfileDb.getJobById(profile.getInstanceId()); + if (jobFromDb == null) { + jobProfileDb.storeJobFirstTime(profile); + } else { + jobFromDb.set(JOB_VERSION, profile.get(JOB_VERSION)); + profile = jobFromDb; + } + LOGGER.info("submit job final profile {}", profile.toJsonStr()); + addJobToMemory(new Job(profile)); } /** @@ -177,10 +174,38 @@ public boolean submitJobProfile(JobProfile profile, boolean singleJob, boolean i } } LOGGER.info("submit job final profile {}", profile.toJsonStr()); - addJob(new Job(profile)); + addJobToMemory(new Job(profile)); return true; } + /** + * submit job to work thread. + * + * @param job job + */ + private void addJobToMemory(Job job) { + if (pendingJobs.containsKey(job.getJobInstanceId())) { + return; + } + try { + JobWrapper jobWrapper = new JobWrapper(agentManager, job); + JobWrapper jobWrapperRet = jobs.putIfAbsent(jobWrapper.getJob().getJobInstanceId(), jobWrapper); + if (jobWrapperRet != null) { + LOGGER.warn("{} has been added to running pool, " + + "cannot be added repeatedly", job.getJobInstanceId()); + return; + } else { + getJobMetric().jobRunningCount.incrementAndGet(); + } + this.runningPool.execute(jobWrapper); + } catch (Exception rje) { + LOGGER.debug("reject job {}", job.getJobInstanceId(), rje); + pendingJobs.putIfAbsent(job.getJobInstanceId(), job); + } catch (Throwable t) { + ThreadUtils.threadThrowableHandler(Thread.currentThread(), t); + } + } + private boolean isJobValid(JobProfile profile) { if (profile == null || !profile.allRequiredKeyExist()) { LOGGER.error("profile is null or not all required key exists {}", profile == null ? null @@ -227,7 +252,7 @@ private void startJobs() { List profileList = jobProfileDb.getRestartJobs(); for (JobProfile profile : profileList) { LOGGER.info("init starting job from db {}", profile.toJsonStr()); - addJob(new Job(profile)); + addJobToMemory(new Job(profile)); } } @@ -242,7 +267,7 @@ public Runnable jobStateCheckThread() { for (String jobId : pendingJobs.keySet()) { Job job = pendingJobs.remove(jobId); if (job != null) { - addJob(job); + addJobToMemory(job); } } TimeUnit.SECONDS.sleep(monitorInterval); diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java index 29a496a6f58..224f1d95674 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java @@ -17,6 +17,21 @@ package org.apache.inlong.agent.core.job; +import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_VERSION; +import static org.apache.inlong.agent.constant.AgentConstants.JOB_VERSION; +import static org.apache.inlong.agent.constant.JobConstants.JOB_OFFSET_DELIMITER; + +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.codec.digest.DigestUtils; import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.JobProfile; @@ -35,26 +50,9 @@ import org.apache.inlong.agent.utils.ThreadUtils; import org.apache.inlong.common.constant.Constants; import org.apache.inlong.common.db.CommandEntity; - -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import org.apache.commons.codec.digest.DigestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_VERSION; -import static org.apache.inlong.agent.constant.AgentConstants.JOB_VERSION; -import static org.apache.inlong.agent.constant.JobConstants.JOB_OFFSET_DELIMITER; - /** * JobWrapper is used in JobManager, it defines the life cycle of * running job and maintains the state of job. @@ -204,6 +202,7 @@ public void submit(JobProfile taskProfile) { public void cleanup() { isEnd = true; allTasks.forEach(task -> taskManager.removeTask(task.getTaskId())); + allTasks.clear(); } @Override diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java index 122415a683b..3c883ad683e 100755 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java @@ -17,6 +17,16 @@ package org.apache.inlong.agent.core.trigger; +import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_TRIGGER_MAX_RUNNING_NUM; +import static org.apache.inlong.agent.constant.AgentConstants.TRIGGER_MAX_RUNNING_NUM; +import static org.apache.inlong.agent.constant.JobConstants.JOB_ID; + +import com.google.common.base.Preconditions; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; import org.apache.inlong.agent.common.AbstractDaemon; import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.JobProfile; @@ -28,21 +38,9 @@ import org.apache.inlong.agent.db.TriggerProfileDb; import org.apache.inlong.agent.plugin.Trigger; import org.apache.inlong.agent.utils.ThreadUtils; - -import com.google.common.base.Preconditions; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; - -import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_TRIGGER_MAX_RUNNING_NUM; -import static org.apache.inlong.agent.constant.AgentConstants.TRIGGER_MAX_RUNNING_NUM; -import static org.apache.inlong.agent.constant.JobConstants.JOB_ID; - /** * manager for triggers. */ @@ -67,13 +65,13 @@ public TriggerManager(AgentManager manager, TriggerProfileDb triggerProfileDb) { } /** - * Restore trigger task. + * add trigger task to memory. * * @param triggerProfile trigger profile */ - public boolean restoreTrigger(TriggerProfile triggerProfile) { + public boolean addTriggerToMemory(TriggerProfile triggerProfile) { try { - Class triggerClass = Class.forName(triggerProfile.get(JobConstants.JOB_FILE_JOB_TRIGGER)); + Class triggerClass = Class.forName(triggerProfile.get(JobConstants.JOB_FILE_TRIGGER)); Trigger trigger = (Trigger) triggerClass.newInstance(); String triggerId = triggerProfile.get(JOB_ID); if (triggerMap.containsKey(triggerId)) { @@ -120,7 +118,7 @@ public void submitTrigger(TriggerProfile triggerProfile, boolean isNewJob) { // This action must be done before saving in db, because the job.instance.id is needed for the next recovery manager.getJobManager().submitJobProfile(triggerProfile, true, isNewJob); triggerProfileDB.storeTrigger(triggerProfile); - restoreTrigger(triggerProfile); + addTriggerToMemory(triggerProfile); } /** @@ -160,6 +158,7 @@ private Runnable jobFetchThread() { JobWrapper job = jobWrapperMap.get(trigger.getTriggerProfile().getInstanceId()); if (job == null) { LOGGER.error("job {} should not be null", trigger.getTriggerProfile().getInstanceId()); + manager.getJobManager().makeUpJob(trigger.getTriggerProfile(), true); return; } String subTaskFile = profile.getOrDefault(JobConstants.JOB_DIR_FILTER_PATTERNS, ""); @@ -171,7 +170,7 @@ private Runnable jobFetchThread() { // necessary to filter the stated monitored file task. boolean alreadyExistTask = job.exist(tasks -> tasks.stream() - .filter(task -> !task.getJobConf().hasKey(JobConstants.JOB_FILE_JOB_TRIGGER)) + .filter(task -> !task.getJobConf().hasKey(JobConstants.JOB_FILE_TRIGGER)) .filter(task -> subTaskFile.equals( task.getJobConf().get(JobConstants.JOB_DIR_FILTER_PATTERNS, ""))) .findAny().isPresent()); @@ -202,7 +201,7 @@ private void initTriggers() { // fetch all triggers from db List profileList = triggerProfileDB.getTriggers(); for (TriggerProfile profile : profileList) { - restoreTrigger(profile); + addTriggerToMemory(profile); } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java index ac4939b147f..d4269740b47 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java @@ -17,43 +17,6 @@ package org.apache.inlong.agent.plugin.fetcher; -import org.apache.inlong.agent.common.AbstractDaemon; -import org.apache.inlong.agent.conf.AgentConfiguration; -import org.apache.inlong.agent.conf.JobProfile; -import org.apache.inlong.agent.conf.ProfileFetcher; -import org.apache.inlong.agent.conf.TriggerProfile; -import org.apache.inlong.agent.core.AgentManager; -import org.apache.inlong.agent.db.CommandDb; -import org.apache.inlong.agent.plugin.Trigger; -import org.apache.inlong.agent.plugin.utils.PluginUtils; -import org.apache.inlong.agent.pojo.ConfirmAgentIpRequest; -import org.apache.inlong.agent.pojo.DbCollectorTaskRequestDto; -import org.apache.inlong.agent.pojo.DbCollectorTaskResult; -import org.apache.inlong.agent.utils.AgentUtils; -import org.apache.inlong.agent.utils.HttpManager; -import org.apache.inlong.agent.utils.ThreadUtils; -import org.apache.inlong.common.db.CommandEntity; -import org.apache.inlong.common.enums.ManagerOpEnum; -import org.apache.inlong.common.enums.PullJobTypeEnum; -import org.apache.inlong.common.pojo.agent.CmdConfig; -import org.apache.inlong.common.pojo.agent.TaskRequest; -import org.apache.inlong.common.pojo.agent.TaskResult; - -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - import static java.util.Objects.requireNonNull; import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME; import static org.apache.inlong.agent.constant.AgentConstants.AGENT_UNIQ_ID; @@ -75,7 +38,7 @@ import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_TDM_IP_CHECK_HTTP_PATH; import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_TDM_VIP_HTTP_PATH; import static org.apache.inlong.agent.constant.FetcherConstants.VERSION; -import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_JOB_TRIGGER; +import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_TRIGGER; import static org.apache.inlong.agent.constant.JobConstants.JOB_OP; import static org.apache.inlong.agent.constant.JobConstants.JOB_RETRY_TIME; import static org.apache.inlong.agent.plugin.fetcher.ManagerResultFormatter.getResultData; @@ -83,6 +46,41 @@ import static org.apache.inlong.agent.utils.AgentUtils.fetchLocalIp; import static org.apache.inlong.agent.utils.AgentUtils.fetchLocalUuid; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.inlong.agent.common.AbstractDaemon; +import org.apache.inlong.agent.conf.AgentConfiguration; +import org.apache.inlong.agent.conf.JobProfile; +import org.apache.inlong.agent.conf.ProfileFetcher; +import org.apache.inlong.agent.conf.TriggerProfile; +import org.apache.inlong.agent.core.AgentManager; +import org.apache.inlong.agent.db.CommandDb; +import org.apache.inlong.agent.plugin.Trigger; +import org.apache.inlong.agent.plugin.utils.PluginUtils; +import org.apache.inlong.agent.pojo.ConfirmAgentIpRequest; +import org.apache.inlong.agent.pojo.DbCollectorTaskRequestDto; +import org.apache.inlong.agent.pojo.DbCollectorTaskResult; +import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.agent.utils.HttpManager; +import org.apache.inlong.agent.utils.ThreadUtils; +import org.apache.inlong.common.db.CommandEntity; +import org.apache.inlong.common.enums.ManagerOpEnum; +import org.apache.inlong.common.enums.PullJobTypeEnum; +import org.apache.inlong.common.pojo.agent.CmdConfig; +import org.apache.inlong.common.pojo.agent.TaskRequest; +import org.apache.inlong.common.pojo.agent.TaskResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Fetch command from Inlong-Manager */ @@ -95,7 +93,7 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher { private static final int MAX_RETRY = 2; private final String managerVipUrl; private final String baseManagerUrl; - private final String managerTaskUrl; + private final String fetchAndReportFileTaskUrl; private final String managerIpsCheckUrl; private final String managerDbCollectorTaskUrl; private final AgentConfiguration conf; @@ -115,7 +113,7 @@ public ManagerFetcher(AgentManager agentManager) { httpManager = new HttpManager(conf); baseManagerUrl = buildBaseUrl(); managerVipUrl = buildVipUrl(baseManagerUrl); - managerTaskUrl = buildFileCollectTaskUrl(baseManagerUrl); + fetchAndReportFileTaskUrl = buildFileCollectTaskUrl(baseManagerUrl); managerIpsCheckUrl = buildIpCheckUrl(baseManagerUrl); managerDbCollectorTaskUrl = buildDbCollectorGetTaskUrl(baseManagerUrl); uniqId = conf.get(AGENT_UNIQ_ID, DEFAULT_AGENT_UNIQ_ID); @@ -132,18 +130,18 @@ private boolean requiredKeys(AgentConfiguration conf) { /** * build base url for manager according to config - *

+ * * example - http://127.0.0.1:8080/inlong/manager/openapi */ private String buildBaseUrl() { return "http://" + conf.get(AGENT_MANAGER_VIP_HTTP_HOST) + ":" + conf.get(AGENT_MANAGER_VIP_HTTP_PORT) + conf.get( - AGENT_MANAGER_VIP_HTTP_PREFIX_PATH, DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH); + AGENT_MANAGER_VIP_HTTP_PREFIX_PATH, DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH); } /** * build vip url for manager according to config - *

+ * * example - http://127.0.0.1:8080/inlong/manager/openapi/agent/getManagerIpList */ private String buildVipUrl(String baseUrl) { @@ -152,7 +150,7 @@ private String buildVipUrl(String baseUrl) { /** * build file collect task url for manager according to config - *

+ * * example - http://127.0.0.1:8080/inlong/manager/openapi/fileAgent/getTaskConf */ private String buildFileCollectTaskUrl(String baseUrl) { @@ -161,7 +159,7 @@ private String buildFileCollectTaskUrl(String baseUrl) { /** * build ip check url for manager according to config - *

+ * * example - http://127.0.0.1:8080/inlong/manager/openapi/fileAgent/confirmAgentIp */ private String buildIpCheckUrl(String baseUrl) { @@ -170,7 +168,7 @@ private String buildIpCheckUrl(String baseUrl) { /** * build db collector get task url for manager according to config - *

+ * * example - http://127.0.0.1:8080/inlong/manager/openapi/dbcollector/getTask */ private String buildDbCollectorGetTaskUrl(String baseUrl) { @@ -209,7 +207,8 @@ public List requestTdmList() { public void fetchCommand() { LOGGER.info("fetchCommand start"); List unackedCommands = commandDb.getUnackedCommands(); - String resultStr = httpManager.doSentPost(managerTaskUrl, getFetchRequest(unackedCommands)); + String resultStr = httpManager.doSentPost(fetchAndReportFileTaskUrl, + getFetchRequest(unackedCommands)); JsonObject resultData = getResultData(resultStr); JsonElement element = resultData.get(AGENT_MANAGER_RETURN_PARAM_DATA); if (element != null) { @@ -267,7 +266,7 @@ private void dealWithFetchResult(TaskResult taskResult) { .map(TriggerProfile::getTriggerProfiles) .forEach(profile -> { LOGGER.info("the triggerProfile: {}", profile.toJsonStr()); - if (profile.hasKey(JOB_FILE_JOB_TRIGGER)) { + if (profile.hasKey(JOB_FILE_TRIGGER)) { dealWithFileTriggerProfile(profile); } else { dealWithJobProfile(profile); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java index 59fbc0b7245..4dfae0c06b8 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java @@ -36,7 +36,7 @@ import static org.apache.inlong.agent.constant.CommonConstants.POSITION_SUFFIX; import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_LINE_FILTER; import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_READ_WAIT_TIMEOUT; -import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_JOB_TRIGGER; +import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_TRIGGER; import static org.apache.inlong.agent.constant.JobConstants.JOB_LINE_FILTER_PATTERN; import static org.apache.inlong.agent.constant.JobConstants.JOB_READ_WAIT_TIMEOUT; @@ -53,7 +53,7 @@ public TextFileSource() { @Override public List split(JobProfile jobConf) { super.init(jobConf); - if (jobConf.hasKey(JOB_FILE_JOB_TRIGGER)) { + if (jobConf.hasKey(JOB_FILE_TRIGGER)) { // trigger as a special reader. return Collections.singletonList(new TriggerFileReader()); } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java index 3639eaf95ee..23d2311ae78 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java @@ -81,7 +81,7 @@ public boolean isSourceExist() { @Override public void init(JobProfile jobConf) { - this.triggerId = jobConf.get(JobConstants.JOB_FILE_JOB_TRIGGER); + this.triggerId = jobConf.get(JobConstants.JOB_FILE_TRIGGER); } @Override diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java index 7ce8171c7fd..38e688713a5 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java @@ -295,7 +295,7 @@ private void registerAllSubDir( Map taskProfile = new HashMap<>(); String md5 = AgentUtils.getFileMd5(path.toFile()); taskProfile.put(path.toFile().getAbsolutePath() + ".md5", md5); - taskProfile.put(JobConstants.JOB_FILE_JOB_TRIGGER, null); // del trigger id + taskProfile.put(JobConstants.JOB_FILE_TRIGGER, null); // del trigger id taskProfile.put(JobConstants.JOB_DIR_FILTER_PATTERNS, path.toFile().getAbsolutePath()); LOGGER.info("trigger_{} generate job profile to read file {}", trigger.getTriggerProfile().getTriggerId(), path); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java index 91412a90a1d..6d84f6d5204 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java @@ -129,7 +129,7 @@ public static JobProfile copyJobProfile(TriggerProfile triggerProfile, File pend JobProfile copiedProfile = TriggerProfile.parseJsonStr(triggerProfile.toJsonStr()); String md5 = AgentUtils.getFileMd5(pendingFile); copiedProfile.set(pendingFile.getAbsolutePath() + ".md5", md5); - copiedProfile.set(JobConstants.JOB_FILE_JOB_TRIGGER, null); // del trigger id + copiedProfile.set(JobConstants.JOB_FILE_TRIGGER, null); // del trigger id copiedProfile.set(JobConstants.JOB_DIR_FILTER_PATTERNS, pendingFile.getAbsolutePath()); return copiedProfile; }