diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java index 79c1fd4a30a..92ac0529f4f 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java @@ -51,7 +51,7 @@ public class JobConstants extends CommonConstants { public static final String JOB_MQ_TOPIC = "job.topicInfo"; // File job - public static final String JOB_TRIGGER = "job.fileJob.trigger"; + public static final String JOB_FILE_JOB_TRIGGER = "job.fileJob.trigger"; public static final String JOB_DIR_FILTER_PATTERN = "job.fileJob.dir.pattern"; // deprecated public static final String JOB_DIR_FILTER_PATTERNS = "job.fileJob.dir.patterns"; public static final String JOB_DIR_FILTER_BLACKLIST = "job.fileJob.dir.blackList"; diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java index 11f9ce793e2..c3e2344b272 100755 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java @@ -19,9 +19,7 @@ import org.apache.inlong.agent.common.AbstractDaemon; import org.apache.inlong.agent.conf.AgentConfiguration; -import org.apache.inlong.agent.conf.JobProfile; import org.apache.inlong.agent.conf.ProfileFetcher; -import org.apache.inlong.agent.conf.TriggerProfile; import org.apache.inlong.agent.constant.AgentConstants; import org.apache.inlong.agent.core.conf.ConfigJetty; import org.apache.inlong.agent.core.job.JobManager; @@ -31,7 +29,6 @@ import org.apache.inlong.agent.db.CommandDb; import org.apache.inlong.agent.db.Db; import org.apache.inlong.agent.db.JobProfileDb; -import org.apache.inlong.agent.db.LocalProfile; import org.apache.inlong.agent.db.TriggerProfileDb; import org.slf4j.Logger; @@ -39,14 +36,9 @@ import java.io.File; import java.lang.reflect.Constructor; -import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CONF_PARENT; -import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_CONF_PARENT; -import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER; - /** * Agent Manager, the bridge for job manager, task manager, db e.t.c it manages agent level operations and communicates * with outside system. @@ -63,7 +55,6 @@ public class AgentManager extends AbstractDaemon { private final AgentConfiguration conf; private final ExecutorService agentConfMonitor; private final Db db; - private final LocalProfile localProfile; private final CommandDb commandDb; private final JobProfileDb jobProfileDb; // jetty for config operations via http. @@ -75,8 +66,6 @@ public AgentManager() { this.db = initDb(); commandDb = new CommandDb(db); jobProfileDb = new JobProfileDb(db); - String parentConfPath = conf.get(AGENT_CONF_PARENT, DEFAULT_AGENT_CONF_PARENT); - localProfile = new LocalProfile(parentConfPath); triggerManager = new TriggerManager(this, new TriggerProfileDb(db)); jobManager = new JobManager(this, jobProfileDb); taskManager = new TaskManager(this); @@ -208,20 +197,6 @@ public void start() throws Exception { LOGGER.info("starting task position manager"); positionManager.start(); LOGGER.info("starting read job from local"); - // read job profiles from local - List profileList = localProfile.readFromLocal(); - for (JobProfile profile : profileList) { - if (profile.hasKey(JOB_TRIGGER)) { - TriggerProfile triggerProfile = TriggerProfile.parseJobProfile(profile); - // there is no need to store this profile in triggerDB, because - // this profile comes from local file. - triggerManager.restoreTrigger(triggerProfile); - } else { - // job db store instance info, so it's suitable to use submitJobProfile - // to store instance into job db. - jobManager.submitFileJobProfile(profile); - } - } LOGGER.info("starting fetcher"); if (fetcher != null) { fetcher.start(); diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java index 34e40f8fff0..b62413b0323 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java @@ -35,8 +35,8 @@ import java.io.Closeable; +import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_JOB_TRIGGER; import static org.apache.inlong.agent.constant.JobConstants.JOB_SOURCE_TYPE; -import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER; /** * start http server and get job/agent config via http @@ -87,7 +87,7 @@ public void storeJobConf(JobProfile jobProfile) { // store job conf to bdb if (jobProfile != null) { // trigger job is a special kind of job - if (jobProfile.hasKey(JOB_TRIGGER)) { + if (jobProfile.hasKey(JOB_FILE_JOB_TRIGGER)) { triggerManager.submitTrigger( TriggerProfile.parseJsonStr(jobProfile.toJsonStr()), true); } else { @@ -123,7 +123,7 @@ public void storeAgentConf(String confJsonStr) { */ public void deleteJobConf(JobProfile jobProfile) { if (jobProfile != null) { - if (jobProfile.hasKey(JOB_TRIGGER)) { + if (jobProfile.hasKey(JOB_FILE_JOB_TRIGGER)) { triggerManager.deleteTrigger(TriggerProfile.parseJobProfile(jobProfile).getTriggerId(), false); } else { jobManager.deleteJob(jobProfile.getInstanceId(), false); diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java index c5c005b83f2..122415a683b 100755 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java @@ -73,7 +73,7 @@ public TriggerManager(AgentManager manager, TriggerProfileDb triggerProfileDb) { */ public boolean restoreTrigger(TriggerProfile triggerProfile) { try { - Class triggerClass = Class.forName(triggerProfile.get(JobConstants.JOB_TRIGGER)); + Class triggerClass = Class.forName(triggerProfile.get(JobConstants.JOB_FILE_JOB_TRIGGER)); Trigger trigger = (Trigger) triggerClass.newInstance(); String triggerId = triggerProfile.get(JOB_ID); if (triggerMap.containsKey(triggerId)) { @@ -171,7 +171,7 @@ private Runnable jobFetchThread() { // necessary to filter the stated monitored file task. boolean alreadyExistTask = job.exist(tasks -> tasks.stream() - .filter(task -> !task.getJobConf().hasKey(JobConstants.JOB_TRIGGER)) + .filter(task -> !task.getJobConf().hasKey(JobConstants.JOB_FILE_JOB_TRIGGER)) .filter(task -> subTaskFile.equals( task.getJobConf().get(JobConstants.JOB_DIR_FILTER_PATTERNS, ""))) .findAny().isPresent()); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java index 350104e698b..ac4939b147f 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java @@ -75,9 +75,9 @@ import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_TDM_IP_CHECK_HTTP_PATH; import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_TDM_VIP_HTTP_PATH; import static org.apache.inlong.agent.constant.FetcherConstants.VERSION; +import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_JOB_TRIGGER; import static org.apache.inlong.agent.constant.JobConstants.JOB_OP; import static org.apache.inlong.agent.constant.JobConstants.JOB_RETRY_TIME; -import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER; import static org.apache.inlong.agent.plugin.fetcher.ManagerResultFormatter.getResultData; import static org.apache.inlong.agent.plugin.utils.PluginUtils.copyJobProfile; import static org.apache.inlong.agent.utils.AgentUtils.fetchLocalIp; @@ -267,8 +267,8 @@ private void dealWithFetchResult(TaskResult taskResult) { .map(TriggerProfile::getTriggerProfiles) .forEach(profile -> { LOGGER.info("the triggerProfile: {}", profile.toJsonStr()); - if (profile.hasKey(JOB_TRIGGER)) { - dealWithTdmTriggerProfile(profile); + if (profile.hasKey(JOB_FILE_JOB_TRIGGER)) { + dealWithFileTriggerProfile(profile); } else { dealWithJobProfile(profile); } @@ -377,7 +377,7 @@ private boolean makeUpFiles(TriggerProfile triggerProfile, String dataTime) { /** * the trigger profile returned from manager should be parsed */ - public void dealWithTdmTriggerProfile(TriggerProfile triggerProfile) { + public void dealWithFileTriggerProfile(TriggerProfile triggerProfile) { ManagerOpEnum opType = ManagerOpEnum.getOpType(triggerProfile.getInt(JOB_OP)); boolean success = true; try { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java index d680d49f0bf..59fbc0b7245 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java @@ -36,9 +36,9 @@ import static org.apache.inlong.agent.constant.CommonConstants.POSITION_SUFFIX; import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_LINE_FILTER; import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_READ_WAIT_TIMEOUT; +import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_JOB_TRIGGER; import static org.apache.inlong.agent.constant.JobConstants.JOB_LINE_FILTER_PATTERN; import static org.apache.inlong.agent.constant.JobConstants.JOB_READ_WAIT_TIMEOUT; -import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER; /** * Read text files @@ -53,7 +53,7 @@ public TextFileSource() { @Override public List split(JobProfile jobConf) { super.init(jobConf); - if (jobConf.hasKey(JOB_TRIGGER)) { + if (jobConf.hasKey(JOB_FILE_JOB_TRIGGER)) { // trigger as a special reader. return Collections.singletonList(new TriggerFileReader()); } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java index f87de408cc9..3639eaf95ee 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java @@ -81,7 +81,7 @@ public boolean isSourceExist() { @Override public void init(JobProfile jobConf) { - this.triggerId = jobConf.get(JobConstants.JOB_TRIGGER); + this.triggerId = jobConf.get(JobConstants.JOB_FILE_JOB_TRIGGER); } @Override diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java index d78f8150e39..7ce8171c7fd 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java @@ -295,7 +295,7 @@ private void registerAllSubDir( Map taskProfile = new HashMap<>(); String md5 = AgentUtils.getFileMd5(path.toFile()); taskProfile.put(path.toFile().getAbsolutePath() + ".md5", md5); - taskProfile.put(JobConstants.JOB_TRIGGER, null); // del trigger id + taskProfile.put(JobConstants.JOB_FILE_JOB_TRIGGER, null); // del trigger id taskProfile.put(JobConstants.JOB_DIR_FILTER_PATTERNS, path.toFile().getAbsolutePath()); LOGGER.info("trigger_{} generate job profile to read file {}", trigger.getTriggerProfile().getTriggerId(), path); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java index 954bdc1a94f..91412a90a1d 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java @@ -129,7 +129,7 @@ public static JobProfile copyJobProfile(TriggerProfile triggerProfile, File pend JobProfile copiedProfile = TriggerProfile.parseJsonStr(triggerProfile.toJsonStr()); String md5 = AgentUtils.getFileMd5(pendingFile); copiedProfile.set(pendingFile.getAbsolutePath() + ".md5", md5); - copiedProfile.set(JobConstants.JOB_TRIGGER, null); // del trigger id + copiedProfile.set(JobConstants.JOB_FILE_JOB_TRIGGER, null); // del trigger id copiedProfile.set(JobConstants.JOB_DIR_FILTER_PATTERNS, pendingFile.getAbsolutePath()); return copiedProfile; }