Skip to content

Commit

Permalink
[INLONG-8655][Agent] fix bug: JobWrapper thread leaks when the job is…
Browse files Browse the repository at this point in the history
… stopped
  • Loading branch information
justinwwhuang committed Aug 7, 2023
1 parent 47098f3 commit 561f4d5
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class JobConstants extends CommonConstants {
public static final String JOB_MQ_TOPIC = "job.topicInfo";

// File job
public static final String JOB_FILE_JOB_TRIGGER = "job.fileJob.trigger";
public static final String JOB_FILE_TRIGGER = "job.fileJob.trigger";
public static final String JOB_DIR_FILTER_PATTERN = "job.fileJob.dir.pattern"; // deprecated
public static final String JOB_DIR_FILTER_PATTERNS = "job.fileJob.dir.patterns";
public static final String JOB_DIR_FILTER_BLACKLIST = "job.fileJob.dir.blackList";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

import java.io.Closeable;

import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_JOB_TRIGGER;
import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_TRIGGER;
import static org.apache.inlong.agent.constant.JobConstants.JOB_SOURCE_TYPE;

/**
Expand Down Expand Up @@ -87,7 +87,7 @@ public void storeJobConf(JobProfile jobProfile) {
// store job conf to bdb
if (jobProfile != null) {
// trigger job is a special kind of job
if (jobProfile.hasKey(JOB_FILE_JOB_TRIGGER)) {
if (jobProfile.hasKey(JOB_FILE_TRIGGER)) {
triggerManager.submitTrigger(
TriggerProfile.parseJsonStr(jobProfile.toJsonStr()), true);
} else {
Expand Down Expand Up @@ -123,7 +123,7 @@ public void storeAgentConf(String confJsonStr) {
*/
public void deleteJobConf(JobProfile jobProfile) {
if (jobProfile != null) {
if (jobProfile.hasKey(JOB_FILE_JOB_TRIGGER)) {
if (jobProfile.hasKey(JOB_FILE_TRIGGER)) {
triggerManager.deleteTrigger(TriggerProfile.parseJobProfile(jobProfile).getTriggerId(), false);
} else {
jobManager.deleteJob(jobProfile.getInstanceId(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,27 @@

package org.apache.inlong.agent.core.job;

import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_DB_CACHE_CHECK_INTERVAL;
import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_DB_CACHE_TIME;
import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_NUMBER_LIMIT;
import static org.apache.inlong.agent.constant.AgentConstants.JOB_DB_CACHE_CHECK_INTERVAL;
import static org.apache.inlong.agent.constant.AgentConstants.JOB_DB_CACHE_TIME;
import static org.apache.inlong.agent.constant.AgentConstants.JOB_NUMBER_LIMIT;
import static org.apache.inlong.agent.constant.AgentConstants.JOB_VERSION;
import static org.apache.inlong.agent.constant.JobConstants.JOB_ID;
import static org.apache.inlong.agent.constant.JobConstants.JOB_ID_PREFIX;
import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
import static org.apache.inlong.agent.constant.JobConstants.SQL_JOB_ID;
import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_COMPONENT_NAME;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.inlong.agent.common.AbstractDaemon;
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
Expand All @@ -31,32 +52,9 @@
import org.apache.inlong.agent.utils.GsonUtil;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.metric.MetricRegister;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_DB_CACHE_CHECK_INTERVAL;
import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_DB_CACHE_TIME;
import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_NUMBER_LIMIT;
import static org.apache.inlong.agent.constant.AgentConstants.JOB_DB_CACHE_CHECK_INTERVAL;
import static org.apache.inlong.agent.constant.AgentConstants.JOB_DB_CACHE_TIME;
import static org.apache.inlong.agent.constant.AgentConstants.JOB_NUMBER_LIMIT;
import static org.apache.inlong.agent.constant.AgentConstants.JOB_VERSION;
import static org.apache.inlong.agent.constant.JobConstants.JOB_ID;
import static org.apache.inlong.agent.constant.JobConstants.JOB_ID_PREFIX;
import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
import static org.apache.inlong.agent.constant.JobConstants.SQL_JOB_ID;
import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_COMPONENT_NAME;

/**
* JobManager maintains lots of jobs, and communicate between server and task manager.
*/
Expand Down Expand Up @@ -108,45 +106,44 @@ public JobManager(AgentManager agentManager, JobProfileDb jobProfileDb) {
this.dimensions = new HashMap<>();
this.dimensions.put(KEY_COMPONENT_NAME, this.getClass().getSimpleName());
this.jobMetrics = new AgentMetricItemSet(this.getClass().getSimpleName());
MetricRegister.unregister(jobMetrics);
MetricRegister.register(jobMetrics);
}

/**
* submit job to work thread.
* add file job profile
*
* @param job job
* @param profile job profile.
*/
private void addJob(Job job) {
if (pendingJobs.containsKey(job.getJobInstanceId())) {
return;
}
try {
JobWrapper jobWrapper = new JobWrapper(agentManager, job);
JobWrapper jobWrapperRet = jobs.putIfAbsent(jobWrapper.getJob().getJobInstanceId(), jobWrapper);
if (jobWrapperRet != null) {
LOGGER.warn("{} has been added to running pool, "
+ "cannot be added repeatedly", job.getJobInstanceId());
return;
} else {
getJobMetric().jobRunningCount.incrementAndGet();
}
this.runningPool.execute(jobWrapper);
} catch (Exception rje) {
LOGGER.debug("reject job {}", job.getJobInstanceId(), rje);
pendingJobs.putIfAbsent(job.getJobInstanceId(), job);
} catch (Throwable t) {
ThreadUtils.threadThrowableHandler(Thread.currentThread(), t);
}
public boolean submitFileJobProfile(JobProfile profile) {
return submitJobProfile(profile, false, true);
}

/**
* add file job profile
* make up file job
*
* @param profile job profile.
*/
public boolean submitFileJobProfile(JobProfile profile) {
return submitJobProfile(profile, false, true);
public void makeUpJob(JobProfile profile, boolean singleJob) {
LOGGER.error("need to make up job {}", profile);
if (!isJobValid(profile)) {
LOGGER.error("make up job failed, invalid profile {}", profile);
return;
}
String jobId = profile.get(JOB_ID);
if (singleJob) {
profile.set(JOB_INSTANCE_ID, AgentUtils.getSingleJobId(JOB_ID_PREFIX, jobId));
} else {
profile.set(JOB_INSTANCE_ID, AgentUtils.getUniqId(JOB_ID_PREFIX, jobId, index.incrementAndGet()));
}
JobProfile jobFromDb = jobProfileDb.getJobById(profile.getInstanceId());
if (jobFromDb == null) {
jobProfileDb.storeJobFirstTime(profile);
} else {
jobFromDb.set(JOB_VERSION, profile.get(JOB_VERSION));
profile = jobFromDb;
}
LOGGER.info("submit job final profile {}", profile.toJsonStr());
addJobToMemory(new Job(profile));
}

/**
Expand Down Expand Up @@ -177,10 +174,38 @@ public boolean submitJobProfile(JobProfile profile, boolean singleJob, boolean i
}
}
LOGGER.info("submit job final profile {}", profile.toJsonStr());
addJob(new Job(profile));
addJobToMemory(new Job(profile));
return true;
}

/**
* submit job to work thread.
*
* @param job job
*/
private void addJobToMemory(Job job) {
if (pendingJobs.containsKey(job.getJobInstanceId())) {
return;
}
try {
JobWrapper jobWrapper = new JobWrapper(agentManager, job);
JobWrapper jobWrapperRet = jobs.putIfAbsent(jobWrapper.getJob().getJobInstanceId(), jobWrapper);
if (jobWrapperRet != null) {
LOGGER.warn("{} has been added to running pool, "
+ "cannot be added repeatedly", job.getJobInstanceId());
return;
} else {
getJobMetric().jobRunningCount.incrementAndGet();
}
this.runningPool.execute(jobWrapper);
} catch (Exception rje) {
LOGGER.debug("reject job {}", job.getJobInstanceId(), rje);
pendingJobs.putIfAbsent(job.getJobInstanceId(), job);
} catch (Throwable t) {
ThreadUtils.threadThrowableHandler(Thread.currentThread(), t);
}
}

private boolean isJobValid(JobProfile profile) {
if (profile == null || !profile.allRequiredKeyExist()) {
LOGGER.error("profile is null or not all required key exists {}", profile == null ? null
Expand Down Expand Up @@ -227,7 +252,7 @@ private void startJobs() {
List<JobProfile> profileList = jobProfileDb.getRestartJobs();
for (JobProfile profile : profileList) {
LOGGER.info("init starting job from db {}", profile.toJsonStr());
addJob(new Job(profile));
addJobToMemory(new Job(profile));
}
}

Expand All @@ -242,7 +267,7 @@ public Runnable jobStateCheckThread() {
for (String jobId : pendingJobs.keySet()) {
Job job = pendingJobs.remove(jobId);
if (job != null) {
addJob(job);
addJobToMemory(job);
}
}
TimeUnit.SECONDS.sleep(monitorInterval);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,21 @@

package org.apache.inlong.agent.core.job;

import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_VERSION;
import static org.apache.inlong.agent.constant.AgentConstants.JOB_VERSION;
import static org.apache.inlong.agent.constant.JobConstants.JOB_OFFSET_DELIMITER;

import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
Expand All @@ -35,26 +50,9 @@
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.constant.Constants;
import org.apache.inlong.common.db.CommandEntity;

import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_VERSION;
import static org.apache.inlong.agent.constant.AgentConstants.JOB_VERSION;
import static org.apache.inlong.agent.constant.JobConstants.JOB_OFFSET_DELIMITER;

/**
* JobWrapper is used in JobManager, it defines the life cycle of
* running job and maintains the state of job.
Expand Down Expand Up @@ -204,6 +202,7 @@ public void submit(JobProfile taskProfile) {
public void cleanup() {
isEnd = true;
allTasks.forEach(task -> taskManager.removeTask(task.getTaskId()));
allTasks.clear();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@

package org.apache.inlong.agent.core.trigger;

import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_TRIGGER_MAX_RUNNING_NUM;
import static org.apache.inlong.agent.constant.AgentConstants.TRIGGER_MAX_RUNNING_NUM;
import static org.apache.inlong.agent.constant.JobConstants.JOB_ID;

import com.google.common.base.Preconditions;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.agent.common.AbstractDaemon;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
Expand All @@ -28,21 +38,9 @@
import org.apache.inlong.agent.db.TriggerProfileDb;
import org.apache.inlong.agent.plugin.Trigger;
import org.apache.inlong.agent.utils.ThreadUtils;

import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_TRIGGER_MAX_RUNNING_NUM;
import static org.apache.inlong.agent.constant.AgentConstants.TRIGGER_MAX_RUNNING_NUM;
import static org.apache.inlong.agent.constant.JobConstants.JOB_ID;

/**
* manager for triggers.
*/
Expand All @@ -67,13 +65,13 @@ public TriggerManager(AgentManager manager, TriggerProfileDb triggerProfileDb) {
}

/**
* Restore trigger task.
* add trigger task to memory.
*
* @param triggerProfile trigger profile
*/
public boolean restoreTrigger(TriggerProfile triggerProfile) {
public boolean addTriggerToMemory(TriggerProfile triggerProfile) {
try {
Class<?> triggerClass = Class.forName(triggerProfile.get(JobConstants.JOB_FILE_JOB_TRIGGER));
Class<?> triggerClass = Class.forName(triggerProfile.get(JobConstants.JOB_FILE_TRIGGER));
Trigger trigger = (Trigger) triggerClass.newInstance();
String triggerId = triggerProfile.get(JOB_ID);
if (triggerMap.containsKey(triggerId)) {
Expand Down Expand Up @@ -120,7 +118,7 @@ public void submitTrigger(TriggerProfile triggerProfile, boolean isNewJob) {
// This action must be done before saving in db, because the job.instance.id is needed for the next recovery
manager.getJobManager().submitJobProfile(triggerProfile, true, isNewJob);
triggerProfileDB.storeTrigger(triggerProfile);
restoreTrigger(triggerProfile);
addTriggerToMemory(triggerProfile);
}

/**
Expand Down Expand Up @@ -160,6 +158,7 @@ private Runnable jobFetchThread() {
JobWrapper job = jobWrapperMap.get(trigger.getTriggerProfile().getInstanceId());
if (job == null) {
LOGGER.error("job {} should not be null", trigger.getTriggerProfile().getInstanceId());
manager.getJobManager().makeUpJob(trigger.getTriggerProfile(), true);
return;
}
String subTaskFile = profile.getOrDefault(JobConstants.JOB_DIR_FILTER_PATTERNS, "");
Expand All @@ -171,7 +170,7 @@ private Runnable jobFetchThread() {
// necessary to filter the stated monitored file task.

boolean alreadyExistTask = job.exist(tasks -> tasks.stream()
.filter(task -> !task.getJobConf().hasKey(JobConstants.JOB_FILE_JOB_TRIGGER))
.filter(task -> !task.getJobConf().hasKey(JobConstants.JOB_FILE_TRIGGER))
.filter(task -> subTaskFile.equals(
task.getJobConf().get(JobConstants.JOB_DIR_FILTER_PATTERNS, "")))
.findAny().isPresent());
Expand Down Expand Up @@ -202,7 +201,7 @@ private void initTriggers() {
// fetch all triggers from db
List<TriggerProfile> profileList = triggerProfileDB.getTriggers();
for (TriggerProfile profile : profileList) {
restoreTrigger(profile);
addTriggerToMemory(profile);
}
}

Expand Down
Loading

0 comments on commit 561f4d5

Please sign in to comment.