Skip to content

Commit

Permalink
[INLONG-8645]delete the capacity of loading trigger from local file
Browse files Browse the repository at this point in the history
the capacity of loading trigger from local file make it hard to operation and maintenance and we never use it online.
  • Loading branch information
justinwwhuang committed Aug 7, 2023
1 parent ee932fa commit 23d08db
Show file tree
Hide file tree
Showing 9 changed files with 15 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class JobConstants extends CommonConstants {
public static final String JOB_MQ_TOPIC = "job.topicInfo";

// File job
public static final String JOB_TRIGGER = "job.fileJob.trigger";
public static final String JOB_FILE_JOB_TRIGGER = "job.fileJob.trigger";
public static final String JOB_DIR_FILTER_PATTERN = "job.fileJob.dir.pattern"; // deprecated
public static final String JOB_DIR_FILTER_PATTERNS = "job.fileJob.dir.patterns";
public static final String JOB_DIR_FILTER_BLACKLIST = "job.fileJob.dir.blackList";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@

import org.apache.inlong.agent.common.AbstractDaemon;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.conf.ProfileFetcher;
import org.apache.inlong.agent.conf.TriggerProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.core.conf.ConfigJetty;
import org.apache.inlong.agent.core.job.JobManager;
Expand All @@ -31,22 +29,16 @@
import org.apache.inlong.agent.db.CommandDb;
import org.apache.inlong.agent.db.Db;
import org.apache.inlong.agent.db.JobProfileDb;
import org.apache.inlong.agent.db.LocalProfile;
import org.apache.inlong.agent.db.TriggerProfileDb;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.lang.reflect.Constructor;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CONF_PARENT;
import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_CONF_PARENT;
import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER;

/**
* Agent Manager, the bridge for job manager, task manager, db e.t.c it manages agent level operations and communicates
* with outside system.
Expand All @@ -63,7 +55,6 @@ public class AgentManager extends AbstractDaemon {
private final AgentConfiguration conf;
private final ExecutorService agentConfMonitor;
private final Db db;
private final LocalProfile localProfile;
private final CommandDb commandDb;
private final JobProfileDb jobProfileDb;
// jetty for config operations via http.
Expand All @@ -75,8 +66,6 @@ public AgentManager() {
this.db = initDb();
commandDb = new CommandDb(db);
jobProfileDb = new JobProfileDb(db);
String parentConfPath = conf.get(AGENT_CONF_PARENT, DEFAULT_AGENT_CONF_PARENT);
localProfile = new LocalProfile(parentConfPath);
triggerManager = new TriggerManager(this, new TriggerProfileDb(db));
jobManager = new JobManager(this, jobProfileDb);
taskManager = new TaskManager(this);
Expand Down Expand Up @@ -208,20 +197,6 @@ public void start() throws Exception {
LOGGER.info("starting task position manager");
positionManager.start();
LOGGER.info("starting read job from local");
// read job profiles from local
List<JobProfile> profileList = localProfile.readFromLocal();
for (JobProfile profile : profileList) {
if (profile.hasKey(JOB_TRIGGER)) {
TriggerProfile triggerProfile = TriggerProfile.parseJobProfile(profile);
// there is no need to store this profile in triggerDB, because
// this profile comes from local file.
triggerManager.restoreTrigger(triggerProfile);
} else {
// job db store instance info, so it's suitable to use submitJobProfile
// to store instance into job db.
jobManager.submitFileJobProfile(profile);
}
}
LOGGER.info("starting fetcher");
if (fetcher != null) {
fetcher.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@

import java.io.Closeable;

import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_JOB_TRIGGER;
import static org.apache.inlong.agent.constant.JobConstants.JOB_SOURCE_TYPE;
import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER;

/**
* start http server and get job/agent config via http
Expand Down Expand Up @@ -87,7 +87,7 @@ public void storeJobConf(JobProfile jobProfile) {
// store job conf to bdb
if (jobProfile != null) {
// trigger job is a special kind of job
if (jobProfile.hasKey(JOB_TRIGGER)) {
if (jobProfile.hasKey(JOB_FILE_JOB_TRIGGER)) {
triggerManager.submitTrigger(
TriggerProfile.parseJsonStr(jobProfile.toJsonStr()), true);
} else {
Expand Down Expand Up @@ -123,7 +123,7 @@ public void storeAgentConf(String confJsonStr) {
*/
public void deleteJobConf(JobProfile jobProfile) {
if (jobProfile != null) {
if (jobProfile.hasKey(JOB_TRIGGER)) {
if (jobProfile.hasKey(JOB_FILE_JOB_TRIGGER)) {
triggerManager.deleteTrigger(TriggerProfile.parseJobProfile(jobProfile).getTriggerId(), false);
} else {
jobManager.deleteJob(jobProfile.getInstanceId(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public TriggerManager(AgentManager manager, TriggerProfileDb triggerProfileDb) {
*/
public boolean restoreTrigger(TriggerProfile triggerProfile) {
try {
Class<?> triggerClass = Class.forName(triggerProfile.get(JobConstants.JOB_TRIGGER));
Class<?> triggerClass = Class.forName(triggerProfile.get(JobConstants.JOB_FILE_JOB_TRIGGER));
Trigger trigger = (Trigger) triggerClass.newInstance();
String triggerId = triggerProfile.get(JOB_ID);
if (triggerMap.containsKey(triggerId)) {
Expand Down Expand Up @@ -171,7 +171,7 @@ private Runnable jobFetchThread() {
// necessary to filter the stated monitored file task.

boolean alreadyExistTask = job.exist(tasks -> tasks.stream()
.filter(task -> !task.getJobConf().hasKey(JobConstants.JOB_TRIGGER))
.filter(task -> !task.getJobConf().hasKey(JobConstants.JOB_FILE_JOB_TRIGGER))
.filter(task -> subTaskFile.equals(
task.getJobConf().get(JobConstants.JOB_DIR_FILTER_PATTERNS, "")))
.findAny().isPresent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@
import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_TDM_IP_CHECK_HTTP_PATH;
import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_TDM_VIP_HTTP_PATH;
import static org.apache.inlong.agent.constant.FetcherConstants.VERSION;
import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_JOB_TRIGGER;
import static org.apache.inlong.agent.constant.JobConstants.JOB_OP;
import static org.apache.inlong.agent.constant.JobConstants.JOB_RETRY_TIME;
import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER;
import static org.apache.inlong.agent.plugin.fetcher.ManagerResultFormatter.getResultData;
import static org.apache.inlong.agent.plugin.utils.PluginUtils.copyJobProfile;
import static org.apache.inlong.agent.utils.AgentUtils.fetchLocalIp;
Expand Down Expand Up @@ -267,8 +267,8 @@ private void dealWithFetchResult(TaskResult taskResult) {
.map(TriggerProfile::getTriggerProfiles)
.forEach(profile -> {
LOGGER.info("the triggerProfile: {}", profile.toJsonStr());
if (profile.hasKey(JOB_TRIGGER)) {
dealWithTdmTriggerProfile(profile);
if (profile.hasKey(JOB_FILE_JOB_TRIGGER)) {
dealWithFileTriggerProfile(profile);
} else {
dealWithJobProfile(profile);
}
Expand Down Expand Up @@ -377,7 +377,7 @@ private boolean makeUpFiles(TriggerProfile triggerProfile, String dataTime) {
/**
* the trigger profile returned from manager should be parsed
*/
public void dealWithTdmTriggerProfile(TriggerProfile triggerProfile) {
public void dealWithFileTriggerProfile(TriggerProfile triggerProfile) {
ManagerOpEnum opType = ManagerOpEnum.getOpType(triggerProfile.getInt(JOB_OP));
boolean success = true;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
import static org.apache.inlong.agent.constant.CommonConstants.POSITION_SUFFIX;
import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_LINE_FILTER;
import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_READ_WAIT_TIMEOUT;
import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_JOB_TRIGGER;
import static org.apache.inlong.agent.constant.JobConstants.JOB_LINE_FILTER_PATTERN;
import static org.apache.inlong.agent.constant.JobConstants.JOB_READ_WAIT_TIMEOUT;
import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER;

/**
* Read text files
Expand All @@ -53,7 +53,7 @@ public TextFileSource() {
@Override
public List<Reader> split(JobProfile jobConf) {
super.init(jobConf);
if (jobConf.hasKey(JOB_TRIGGER)) {
if (jobConf.hasKey(JOB_FILE_JOB_TRIGGER)) {
// trigger as a special reader.
return Collections.singletonList(new TriggerFileReader());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public boolean isSourceExist() {

@Override
public void init(JobProfile jobConf) {
this.triggerId = jobConf.get(JobConstants.JOB_TRIGGER);
this.triggerId = jobConf.get(JobConstants.JOB_FILE_JOB_TRIGGER);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ private void registerAllSubDir(
Map<String, String> taskProfile = new HashMap<>();
String md5 = AgentUtils.getFileMd5(path.toFile());
taskProfile.put(path.toFile().getAbsolutePath() + ".md5", md5);
taskProfile.put(JobConstants.JOB_TRIGGER, null); // del trigger id
taskProfile.put(JobConstants.JOB_FILE_JOB_TRIGGER, null); // del trigger id
taskProfile.put(JobConstants.JOB_DIR_FILTER_PATTERNS, path.toFile().getAbsolutePath());
LOGGER.info("trigger_{} generate job profile to read file {}",
trigger.getTriggerProfile().getTriggerId(), path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public static JobProfile copyJobProfile(TriggerProfile triggerProfile, File pend
JobProfile copiedProfile = TriggerProfile.parseJsonStr(triggerProfile.toJsonStr());
String md5 = AgentUtils.getFileMd5(pendingFile);
copiedProfile.set(pendingFile.getAbsolutePath() + ".md5", md5);
copiedProfile.set(JobConstants.JOB_TRIGGER, null); // del trigger id
copiedProfile.set(JobConstants.JOB_FILE_JOB_TRIGGER, null); // del trigger id
copiedProfile.set(JobConstants.JOB_DIR_FILTER_PATTERNS, pendingFile.getAbsolutePath());
return copiedProfile;
}
Expand Down

0 comments on commit 23d08db

Please sign in to comment.